< Summary

Information
Class: CounterpointCollective.Dataflow.DefaultBatchSizeStrategy
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/AutoScalingBlockExtensions.cs
Line coverage
88%
Covered lines: 59
Uncovered lines: 8
Coverable lines: 67
Total lines: 244
Line coverage: 88%
Branch coverage
70%
Covered branches: 17
Total branches: 24
Branch coverage: 70.8%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
get_BatchSize()100%11100%
get_Throughput()100%11100%
get_BatchSize()100%11100%
CounterpointCollective.Dataflow.IBatchSizeStrategy.get_DebugView()100%210%
get_DebugView()100%11100%
UpdateBatchSize(...)100%11100%
CalculateNewBatchSize(...)100%44100%
CalculateNextBatchSize(...)71.42%151484%
EnsureChange(...)50%7666.66%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/AutoScalingBlockExtensions.cs

#LineLine coverage
 1using CounterpointCollective.Utilities;
 2using System;
 3using System.Collections.Generic;
 4using System.Diagnostics;
 5using System.Diagnostics.CodeAnalysis;
 6using System.Threading.Tasks;
 7
 8namespace CounterpointCollective.Dataflow
 9{
 10    public record BatchRunEvent<T>(ResizableBatchTransformBlock<T, T> Source, int BatchSize, double RunMillis, int NextB
 11
 12    public static class AutoScalingBlockExtensions
 13    {
 14        public static bool DebugBeeps { get; set; }
 15
 16        private static int c;
 17
 18        [SuppressMessage("Interoperability", "CA1416:Validate platform compatibility", Justification = "Non critical")]
 19        public static void EnableAutoScaling<T>(
 20            this ResizableBatchTransformBlock<T, T> block,
 21            IBatchSizeStrategy batchSizeStrategy,
 22            Action<BatchRunEvent<T>>? onBatchRun = null
 23        )
 24        {
 25            var i = c++;
 26            var initialBatchSize = batchSizeStrategy.BatchSize;
 27            block.BatchSize = initialBatchSize;
 28
 29            block.OnBatch = (_, batch) =>
 30            {
 31                var sw = new Stopwatch();
 32
 33                if (DebugBeeps)
 34                {
 35                    Console.Beep(490 + (i * 50), 100);
 36                }
 37
 38                sw.Start();
 39
 40                return new ActionDisposable(() =>
 41                {
 42                    var lag = sw.Elapsed.TotalMilliseconds;
 43                    var newBatchSize = batchSizeStrategy.UpdateBatchSize(batch.Count, lag);
 44                    block.BatchSize = newBatchSize;
 45
 46                    onBatchRun?.Invoke(new(block, batch.Count, lag, newBatchSize));
 47                });
 48
 49            };
 50        }
 51    }
 52
 53
 54    public record BatchSizeCalculation
 55    {
 56        public int MinBatchSize { get; set; }
 57        public int MaxBatchSize { get; set; }
 58
 59        public int AllowedBatchSizeRange => MaxBatchSize - MinBatchSize;
 60
 61        public int? OldBatchSize { get; set; }
 62        public int DeltaBatch { get; set; }
 63        public double DeltaThroughput { get; set; }
 64        public double DeltaThroughputPerDeltaBatch =>
 65            DeltaBatch == 0 ? 0 : DeltaThroughput / DeltaBatch;
 66
 67        public double S1Calculated { get; set; }
 68        public double S2SetPoint { get; set; }
 69        public double S3DampenedSetPoint { get; set; }
 70        public double S4Clamped => Math.Clamp(S3DampenedSetPoint, MinBatchSize, MaxBatchSize);
 71        public double? S5EnsureChangedBatchSize { get; set; }
 72        public IEnumerable<double> LowPassBuffer { get; set; } = [];
 73        public int NewBatchSize { get; set; }
 74    }
 75
 76    public interface IBatchSizeStrategy
 77    {
 78        public int UpdateBatchSize(int batchSize, double totalRunMillis);
 79        public int BatchSize { get; }
 80
 81        public object DebugView { get; }
 82    }
 83
 84    /// <summary>
 85    /// Default implementation that monitors throughput and adjusts batch size accordingly.
 86    /// </summary>
 87    /// <param name="minBatchSize"></param>
 88    /// The smallest batch size the strategy may select.
 89    /// <param name="maxBatchSize"></param>
 90    /// The largest batch size the strategy may select.
 91    /// <param name="initialBatchSize"></param>
 92    /// The first batch size the strategy will set.
 93    /// <param name="adjustmentSensitivity">
 94    /// Controls how aggressively the strategy reacts to throughput changes.
 95    /// Higher values cause larger adjustments; lower values make the strategy steadier.
 96    /// </param>
 97    /// <param name="maxAdjustmentFraction">
 98    /// Limits how far the batch size may move in a single adjustment,
 99    /// expressed as a fraction of the allowed batch size range.
 100    /// Prevents sudden large jumps.
 101    /// </param>
 102    /// <param name="dampeningWindowSize">
 103    /// Number of recent calculated setpoints to average together to smooth fluctuations.
 104    /// Larger values produce a more stable, slower-responding batch size.
 105    /// </param>
 106    /// <param name="maxQueryTimeSeconds">
 107    /// Maximum expected duration of a single batch run, used for safety/timeout logic.
 108    /// </param>
 1109    public class DefaultBatchSizeStrategy(
 1110        int minBatchSize = 1,
 1111        int maxBatchSize = 50,
 1112        int initialBatchSize = 50,
 1113        double adjustmentSensitivity = 0.1,
 1114        double maxAdjustmentFraction = 0.1,
 1115        int dampeningWindowSize = 5,
 1116        int maxQueryTimeSeconds = 60
 1117    ): IBatchSizeStrategy
 118    {
 2001119        private record BatchStat(int BatchSize, double TotalRunMillis)
 120        {
 555121            public double Throughput => BatchSize * 1000 / (TotalRunMillis + 1);
 122        }
 123
 124
 1125        private readonly Random _rndm = new();
 126        private BatchStat? prevBatchStat;
 1127        private readonly LowPassFilter _lowPassFilter = new(dampeningWindowSize, initialBatchSize);
 128
 226129        public int BatchSize { get; private set; } = initialBatchSize;
 130
 0131        object IBatchSizeStrategy.DebugView => DebugView;
 132
 225133        public BatchSizeCalculation DebugView { get; private set; } = new BatchSizeCalculation();
 134
 135        public int UpdateBatchSize(int batchSize, double totalRunMillis)
 136        {
 112137            var currStat = new BatchStat(batchSize, totalRunMillis);
 112138            DebugView = CalculateNewBatchSize(currStat);
 112139            prevBatchStat = currStat;
 140
 112141            BatchSize = DebugView.NewBatchSize;
 112142            return BatchSize;
 143        }
 144
 145        private BatchSizeCalculation CalculateNewBatchSize(BatchStat currStat)
 146        {
 112147            var bsc = new BatchSizeCalculation()
 112148            {
 112149                MinBatchSize = minBatchSize,
 112150                MaxBatchSize = maxBatchSize,
 112151            };
 112152            var newBatchSize =
 112153                prevBatchStat == null ? currStat.BatchSize : CalculateNextBatchSize(currStat, bsc);
 154
 112155            if ((int)Math.Round(newBatchSize) == currStat.BatchSize)
 156            {
 1157                newBatchSize = EnsureChange(newBatchSize);
 1158                bsc.S5EnsureChangedBatchSize = newBatchSize;
 159            }
 160
 112161            bsc.NewBatchSize = (int) Math.Round(newBatchSize);
 112162            return bsc;
 163        }
 164
 165        private double CalculateNextBatchSize(BatchStat currStat, BatchSizeCalculation bsc)
 166        {
 111167            bsc.OldBatchSize = prevBatchStat!.BatchSize;
 111168            bsc.DeltaBatch = currStat.BatchSize - prevBatchStat.BatchSize;
 111169            bsc.DeltaThroughput = currStat.Throughput - prevBatchStat.Throughput;
 170
 111171            var betterBatchSize = currStat.Throughput > prevBatchStat!.Throughput ? currStat.BatchSize : prevBatchStat.B
 111172            var scaledAdjustment = adjustmentSensitivity * bsc.DeltaThroughputPerDeltaBatch * bsc.AllowedBatchSizeRange;
 111173            var setpoint = betterBatchSize + scaledAdjustment;
 174
 111175            if (Math.Abs(scaledAdjustment / bsc.AllowedBatchSizeRange) > maxAdjustmentFraction)
 176            {
 111177                if (scaledAdjustment > 0)
 178                {
 62179                    setpoint = currStat.BatchSize + (bsc.AllowedBatchSizeRange * maxAdjustmentFraction);
 180                }
 181                else
 182                {
 49183                    setpoint = currStat.BatchSize - (bsc.AllowedBatchSizeRange * maxAdjustmentFraction);
 184                }
 185            }
 186
 111187            bsc.S1Calculated = setpoint;
 111188            if (setpoint > maxQueryTimeSeconds * currStat.Throughput)
 189            {
 0190                setpoint = maxQueryTimeSeconds * currStat.Throughput;
 191            }
 192
 111193            if (setpoint < 1)
 194            {
 1195                setpoint = 1;
 196            }
 197
 111198            bsc.S2SetPoint = setpoint;
 199
 200
 111201            var newBatchSize = _lowPassFilter.Next(setpoint);
 111202            int rounded = (int)Math.Round(newBatchSize);
 111203            if (rounded == currStat.BatchSize)
 204            {
 0205                if (rounded >= currStat.BatchSize)
 0206                    newBatchSize = currStat.BatchSize + 1;
 207                else
 0208                    newBatchSize = currStat.BatchSize - 1;
 209            }
 210
 111211            bsc.LowPassBuffer = _lowPassFilter.GetBuffer();
 111212            bsc.S3DampenedSetPoint = newBatchSize;
 111213            return bsc.S4Clamped;
 214        }
 215
 216        private int EnsureChange(double input)
 217        {
 1218            var larger = (int)Math.Round(input * 1.1) + 1;
 1219            var smaller = (int)Math.Round(input * (1 / 1.1)) - 1;
 220
 1221            if (larger > maxBatchSize)
 222            {
 0223                return smaller;
 224            }
 1225            else if (smaller <= minBatchSize)
 226            {
 0227                return larger;
 228            }
 229            else
 230#pragma warning disable CA5394 // Do not use insecure randomness
 1231            if (_rndm.NextDouble() >= .5)
 232            {
 233#pragma warning disable CA5394 // Do not use insecure randomness
 0234                return larger;
 235            }
 236            else
 237            {
 1238                return smaller;
 239            }
 240        }
 241    }
 242
 243    public delegate Task<List<T>> BatchAction<T>(List<T> batch);
 244}