< Summary

Information
Class: CounterpointCollective.Dataflow.AutoScalingBlockExtensions
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/AutoScalingBlockExtensions.cs
Line coverage
96%
Covered lines: 25
Uncovered lines: 1
Coverable lines: 26
Total lines: 244
Line coverage: 96.1%
Branch coverage
50%
Covered branches: 2
Total branches: 4
Branch coverage: 50%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_DebugBeeps()100%11100%
EnableAutoScaling(...)50%4496%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/AutoScalingBlockExtensions.cs

#LineLine coverage
 1using CounterpointCollective.Utilities;
 2using System;
 3using System.Collections.Generic;
 4using System.Diagnostics;
 5using System.Diagnostics.CodeAnalysis;
 6using System.Threading.Tasks;
 7
 8namespace CounterpointCollective.Dataflow
 9{
 10    public record BatchRunEvent<T>(ResizableBatchTransformBlock<T, T> Source, int BatchSize, double RunMillis, int NextB
 11
 12    public static class AutoScalingBlockExtensions
 13    {
 11214        public static bool DebugBeeps { get; set; }
 15
 16        private static int c;
 17
 18        [SuppressMessage("Interoperability", "CA1416:Validate platform compatibility", Justification = "Non critical")]
 19        public static void EnableAutoScaling<T>(
 20            this ResizableBatchTransformBlock<T, T> block,
 21            IBatchSizeStrategy batchSizeStrategy,
 22            Action<BatchRunEvent<T>>? onBatchRun = null
 23        )
 24        {
 125            var i = c++;
 126            var initialBatchSize = batchSizeStrategy.BatchSize;
 127            block.BatchSize = initialBatchSize;
 28
 129            block.OnBatch = (_, batch) =>
 130            {
 11231                var sw = new Stopwatch();
 132
 11233                if (DebugBeeps)
 134                {
 035                    Console.Beep(490 + (i * 50), 100);
 136                }
 137
 11238                sw.Start();
 139
 11240                return new ActionDisposable(() =>
 11241                {
 11242                    var lag = sw.Elapsed.TotalMilliseconds;
 11243                    var newBatchSize = batchSizeStrategy.UpdateBatchSize(batch.Count, lag);
 11244                    block.BatchSize = newBatchSize;
 11245
 11246                    onBatchRun?.Invoke(new(block, batch.Count, lag, newBatchSize));
 11247                });
 148
 149            };
 150        }
 51    }
 52
 53
 54    public record BatchSizeCalculation
 55    {
 56        public int MinBatchSize { get; set; }
 57        public int MaxBatchSize { get; set; }
 58
 59        public int AllowedBatchSizeRange => MaxBatchSize - MinBatchSize;
 60
 61        public int? OldBatchSize { get; set; }
 62        public int DeltaBatch { get; set; }
 63        public double DeltaThroughput { get; set; }
 64        public double DeltaThroughputPerDeltaBatch =>
 65            DeltaBatch == 0 ? 0 : DeltaThroughput / DeltaBatch;
 66
 67        public double S1Calculated { get; set; }
 68        public double S2SetPoint { get; set; }
 69        public double S3DampenedSetPoint { get; set; }
 70        public double S4Clamped => Math.Clamp(S3DampenedSetPoint, MinBatchSize, MaxBatchSize);
 71        public double? S5EnsureChangedBatchSize { get; set; }
 72        public IEnumerable<double> LowPassBuffer { get; set; } = [];
 73        public int NewBatchSize { get; set; }
 74    }
 75
 76    public interface IBatchSizeStrategy
 77    {
 78        public int UpdateBatchSize(int batchSize, double totalRunMillis);
 79        public int BatchSize { get; }
 80
 81        public object DebugView { get; }
 82    }
 83
 84    /// <summary>
 85    /// Default implementation that monitors throughput and adjusts batch size accordingly.
 86    /// </summary>
 87    /// <param name="minBatchSize"></param>
 88    /// The smallest batch size the strategy may select.
 89    /// <param name="maxBatchSize"></param>
 90    /// The largest batch size the strategy may select.
 91    /// <param name="initialBatchSize"></param>
 92    /// The first batch size the strategy will set.
 93    /// <param name="adjustmentSensitivity">
 94    /// Controls how aggressively the strategy reacts to throughput changes.
 95    /// Higher values cause larger adjustments; lower values make the strategy steadier.
 96    /// </param>
 97    /// <param name="maxAdjustmentFraction">
 98    /// Limits how far the batch size may move in a single adjustment,
 99    /// expressed as a fraction of the allowed batch size range.
 100    /// Prevents sudden large jumps.
 101    /// </param>
 102    /// <param name="dampeningWindowSize">
 103    /// Number of recent calculated setpoints to average together to smooth fluctuations.
 104    /// Larger values produce a more stable, slower-responding batch size.
 105    /// </param>
 106    /// <param name="maxQueryTimeSeconds">
 107    /// Maximum expected duration of a single batch run, used for safety/timeout logic.
 108    /// </param>
 109    public class DefaultBatchSizeStrategy(
 110        int minBatchSize = 1,
 111        int maxBatchSize = 50,
 112        int initialBatchSize = 50,
 113        double adjustmentSensitivity = 0.1,
 114        double maxAdjustmentFraction = 0.1,
 115        int dampeningWindowSize = 5,
 116        int maxQueryTimeSeconds = 60
 117    ): IBatchSizeStrategy
 118    {
 119        private record BatchStat(int BatchSize, double TotalRunMillis)
 120        {
 121            public double Throughput => BatchSize * 1000 / (TotalRunMillis + 1);
 122        }
 123
 124
 125        private readonly Random _rndm = new();
 126        private BatchStat? prevBatchStat;
 127        private readonly LowPassFilter _lowPassFilter = new(dampeningWindowSize, initialBatchSize);
 128
 129        public int BatchSize { get; private set; } = initialBatchSize;
 130
 131        object IBatchSizeStrategy.DebugView => DebugView;
 132
 133        public BatchSizeCalculation DebugView { get; private set; } = new BatchSizeCalculation();
 134
 135        public int UpdateBatchSize(int batchSize, double totalRunMillis)
 136        {
 137            var currStat = new BatchStat(batchSize, totalRunMillis);
 138            DebugView = CalculateNewBatchSize(currStat);
 139            prevBatchStat = currStat;
 140
 141            BatchSize = DebugView.NewBatchSize;
 142            return BatchSize;
 143        }
 144
 145        private BatchSizeCalculation CalculateNewBatchSize(BatchStat currStat)
 146        {
 147            var bsc = new BatchSizeCalculation()
 148            {
 149                MinBatchSize = minBatchSize,
 150                MaxBatchSize = maxBatchSize,
 151            };
 152            var newBatchSize =
 153                prevBatchStat == null ? currStat.BatchSize : CalculateNextBatchSize(currStat, bsc);
 154
 155            if ((int)Math.Round(newBatchSize) == currStat.BatchSize)
 156            {
 157                newBatchSize = EnsureChange(newBatchSize);
 158                bsc.S5EnsureChangedBatchSize = newBatchSize;
 159            }
 160
 161            bsc.NewBatchSize = (int) Math.Round(newBatchSize);
 162            return bsc;
 163        }
 164
 165        private double CalculateNextBatchSize(BatchStat currStat, BatchSizeCalculation bsc)
 166        {
 167            bsc.OldBatchSize = prevBatchStat!.BatchSize;
 168            bsc.DeltaBatch = currStat.BatchSize - prevBatchStat.BatchSize;
 169            bsc.DeltaThroughput = currStat.Throughput - prevBatchStat.Throughput;
 170
 171            var betterBatchSize = currStat.Throughput > prevBatchStat!.Throughput ? currStat.BatchSize : prevBatchStat.B
 172            var scaledAdjustment = adjustmentSensitivity * bsc.DeltaThroughputPerDeltaBatch * bsc.AllowedBatchSizeRange;
 173            var setpoint = betterBatchSize + scaledAdjustment;
 174
 175            if (Math.Abs(scaledAdjustment / bsc.AllowedBatchSizeRange) > maxAdjustmentFraction)
 176            {
 177                if (scaledAdjustment > 0)
 178                {
 179                    setpoint = currStat.BatchSize + (bsc.AllowedBatchSizeRange * maxAdjustmentFraction);
 180                }
 181                else
 182                {
 183                    setpoint = currStat.BatchSize - (bsc.AllowedBatchSizeRange * maxAdjustmentFraction);
 184                }
 185            }
 186
 187            bsc.S1Calculated = setpoint;
 188            if (setpoint > maxQueryTimeSeconds * currStat.Throughput)
 189            {
 190                setpoint = maxQueryTimeSeconds * currStat.Throughput;
 191            }
 192
 193            if (setpoint < 1)
 194            {
 195                setpoint = 1;
 196            }
 197
 198            bsc.S2SetPoint = setpoint;
 199
 200
 201            var newBatchSize = _lowPassFilter.Next(setpoint);
 202            int rounded = (int)Math.Round(newBatchSize);
 203            if (rounded == currStat.BatchSize)
 204            {
 205                if (rounded >= currStat.BatchSize)
 206                    newBatchSize = currStat.BatchSize + 1;
 207                else
 208                    newBatchSize = currStat.BatchSize - 1;
 209            }
 210
 211            bsc.LowPassBuffer = _lowPassFilter.GetBuffer();
 212            bsc.S3DampenedSetPoint = newBatchSize;
 213            return bsc.S4Clamped;
 214        }
 215
 216        private int EnsureChange(double input)
 217        {
 218            var larger = (int)Math.Round(input * 1.1) + 1;
 219            var smaller = (int)Math.Round(input * (1 / 1.1)) - 1;
 220
 221            if (larger > maxBatchSize)
 222            {
 223                return smaller;
 224            }
 225            else if (smaller <= minBatchSize)
 226            {
 227                return larger;
 228            }
 229            else
 230#pragma warning disable CA5394 // Do not use insecure randomness
 231            if (_rndm.NextDouble() >= .5)
 232            {
 233#pragma warning disable CA5394 // Do not use insecure randomness
 234                return larger;
 235            }
 236            else
 237            {
 238                return smaller;
 239            }
 240        }
 241    }
 242
 243    public delegate Task<List<T>> BatchAction<T>(List<T> batch);
 244}