| | | 1 | | using CounterpointCollective.Utilities; |
| | | 2 | | using System; |
| | | 3 | | using System.Collections.Generic; |
| | | 4 | | using System.Diagnostics; |
| | | 5 | | using System.Diagnostics.CodeAnalysis; |
| | | 6 | | using System.Threading.Tasks; |
| | | 7 | | |
| | | 8 | | namespace CounterpointCollective.Dataflow |
| | | 9 | | { |
| | 0 | 10 | | public record BatchRunEvent<T>(ResizableBatchTransformBlock<T, T> Source, int BatchSize, double RunMillis, int NextB |
| | | 11 | | |
| | | 12 | | public static class AutoScalingBlockExtensions |
| | | 13 | | { |
| | | 14 | | public static bool DebugBeeps { get; set; } |
| | | 15 | | |
| | | 16 | | private static int c; |
| | | 17 | | |
| | | 18 | | [SuppressMessage("Interoperability", "CA1416:Validate platform compatibility", Justification = "Non critical")] |
| | | 19 | | public static void EnableAutoScaling<T>( |
| | | 20 | | this ResizableBatchTransformBlock<T, T> block, |
| | | 21 | | IBatchSizeStrategy batchSizeStrategy, |
| | | 22 | | Action<BatchRunEvent<T>>? onBatchRun = null |
| | | 23 | | ) |
| | | 24 | | { |
| | | 25 | | var i = c++; |
| | | 26 | | var initialBatchSize = batchSizeStrategy.BatchSize; |
| | | 27 | | block.BatchSize = initialBatchSize; |
| | | 28 | | |
| | | 29 | | block.OnBatch = (_, batch) => |
| | | 30 | | { |
| | | 31 | | var sw = new Stopwatch(); |
| | | 32 | | |
| | | 33 | | if (DebugBeeps) |
| | | 34 | | { |
| | | 35 | | Console.Beep(490 + (i * 50), 100); |
| | | 36 | | } |
| | | 37 | | |
| | | 38 | | sw.Start(); |
| | | 39 | | |
| | | 40 | | return new ActionDisposable(() => |
| | | 41 | | { |
| | | 42 | | var lag = sw.Elapsed.TotalMilliseconds; |
| | | 43 | | var newBatchSize = batchSizeStrategy.UpdateBatchSize(batch.Count, lag); |
| | | 44 | | block.BatchSize = newBatchSize; |
| | | 45 | | |
| | | 46 | | onBatchRun?.Invoke(new(block, batch.Count, lag, newBatchSize)); |
| | | 47 | | }); |
| | | 48 | | |
| | | 49 | | }; |
| | | 50 | | } |
| | | 51 | | } |
| | | 52 | | |
| | | 53 | | |
| | | 54 | | public record BatchSizeCalculation |
| | | 55 | | { |
| | | 56 | | public int MinBatchSize { get; set; } |
| | | 57 | | public int MaxBatchSize { get; set; } |
| | | 58 | | |
| | | 59 | | public int AllowedBatchSizeRange => MaxBatchSize - MinBatchSize; |
| | | 60 | | |
| | | 61 | | public int? OldBatchSize { get; set; } |
| | | 62 | | public int DeltaBatch { get; set; } |
| | | 63 | | public double DeltaThroughput { get; set; } |
| | | 64 | | public double DeltaThroughputPerDeltaBatch => |
| | | 65 | | DeltaBatch == 0 ? 0 : DeltaThroughput / DeltaBatch; |
| | | 66 | | |
| | | 67 | | public double S1Calculated { get; set; } |
| | | 68 | | public double S2SetPoint { get; set; } |
| | | 69 | | public double S3DampenedSetPoint { get; set; } |
| | | 70 | | public double S4Clamped => Math.Clamp(S3DampenedSetPoint, MinBatchSize, MaxBatchSize); |
| | | 71 | | public double? S5EnsureChangedBatchSize { get; set; } |
| | | 72 | | public IEnumerable<double> LowPassBuffer { get; set; } = []; |
| | | 73 | | public int NewBatchSize { get; set; } |
| | | 74 | | } |
| | | 75 | | |
| | | 76 | | public interface IBatchSizeStrategy |
| | | 77 | | { |
| | | 78 | | public int UpdateBatchSize(int batchSize, double totalRunMillis); |
| | | 79 | | public int BatchSize { get; } |
| | | 80 | | |
| | | 81 | | public object DebugView { get; } |
| | | 82 | | } |
| | | 83 | | |
| | | 84 | | /// <summary> |
| | | 85 | | /// Default implementation that monitors throughput and adjusts batch size accordingly. |
| | | 86 | | /// </summary> |
| | | 87 | | /// <param name="minBatchSize"></param> |
| | | 88 | | /// The smallest batch size the strategy may select. |
| | | 89 | | /// <param name="maxBatchSize"></param> |
| | | 90 | | /// The largest batch size the strategy may select. |
| | | 91 | | /// <param name="initialBatchSize"></param> |
| | | 92 | | /// The first batch size the strategy will set. |
| | | 93 | | /// <param name="adjustmentSensitivity"> |
| | | 94 | | /// Controls how aggressively the strategy reacts to throughput changes. |
| | | 95 | | /// Higher values cause larger adjustments; lower values make the strategy steadier. |
| | | 96 | | /// </param> |
| | | 97 | | /// <param name="maxAdjustmentFraction"> |
| | | 98 | | /// Limits how far the batch size may move in a single adjustment, |
| | | 99 | | /// expressed as a fraction of the allowed batch size range. |
| | | 100 | | /// Prevents sudden large jumps. |
| | | 101 | | /// </param> |
| | | 102 | | /// <param name="dampeningWindowSize"> |
| | | 103 | | /// Number of recent calculated setpoints to average together to smooth fluctuations. |
| | | 104 | | /// Larger values produce a more stable, slower-responding batch size. |
| | | 105 | | /// </param> |
| | | 106 | | /// <param name="maxQueryTimeSeconds"> |
| | | 107 | | /// Maximum expected duration of a single batch run, used for safety/timeout logic. |
| | | 108 | | /// </param> |
| | | 109 | | public class DefaultBatchSizeStrategy( |
| | | 110 | | int minBatchSize = 1, |
| | | 111 | | int maxBatchSize = 50, |
| | | 112 | | int initialBatchSize = 50, |
| | | 113 | | double adjustmentSensitivity = 0.1, |
| | | 114 | | double maxAdjustmentFraction = 0.1, |
| | | 115 | | int dampeningWindowSize = 5, |
| | | 116 | | int maxQueryTimeSeconds = 60 |
| | | 117 | | ): IBatchSizeStrategy |
| | | 118 | | { |
| | | 119 | | private record BatchStat(int BatchSize, double TotalRunMillis) |
| | | 120 | | { |
| | | 121 | | public double Throughput => BatchSize * 1000 / (TotalRunMillis + 1); |
| | | 122 | | } |
| | | 123 | | |
| | | 124 | | |
| | | 125 | | private readonly Random _rndm = new(); |
| | | 126 | | private BatchStat? prevBatchStat; |
| | | 127 | | private readonly LowPassFilter _lowPassFilter = new(dampeningWindowSize, initialBatchSize); |
| | | 128 | | |
| | | 129 | | public int BatchSize { get; private set; } = initialBatchSize; |
| | | 130 | | |
| | | 131 | | object IBatchSizeStrategy.DebugView => DebugView; |
| | | 132 | | |
| | | 133 | | public BatchSizeCalculation DebugView { get; private set; } = new BatchSizeCalculation(); |
| | | 134 | | |
| | | 135 | | public int UpdateBatchSize(int batchSize, double totalRunMillis) |
| | | 136 | | { |
| | | 137 | | var currStat = new BatchStat(batchSize, totalRunMillis); |
| | | 138 | | DebugView = CalculateNewBatchSize(currStat); |
| | | 139 | | prevBatchStat = currStat; |
| | | 140 | | |
| | | 141 | | BatchSize = DebugView.NewBatchSize; |
| | | 142 | | return BatchSize; |
| | | 143 | | } |
| | | 144 | | |
| | | 145 | | private BatchSizeCalculation CalculateNewBatchSize(BatchStat currStat) |
| | | 146 | | { |
| | | 147 | | var bsc = new BatchSizeCalculation() |
| | | 148 | | { |
| | | 149 | | MinBatchSize = minBatchSize, |
| | | 150 | | MaxBatchSize = maxBatchSize, |
| | | 151 | | }; |
| | | 152 | | var newBatchSize = |
| | | 153 | | prevBatchStat == null ? currStat.BatchSize : CalculateNextBatchSize(currStat, bsc); |
| | | 154 | | |
| | | 155 | | if ((int)Math.Round(newBatchSize) == currStat.BatchSize) |
| | | 156 | | { |
| | | 157 | | newBatchSize = EnsureChange(newBatchSize); |
| | | 158 | | bsc.S5EnsureChangedBatchSize = newBatchSize; |
| | | 159 | | } |
| | | 160 | | |
| | | 161 | | bsc.NewBatchSize = (int) Math.Round(newBatchSize); |
| | | 162 | | return bsc; |
| | | 163 | | } |
| | | 164 | | |
| | | 165 | | private double CalculateNextBatchSize(BatchStat currStat, BatchSizeCalculation bsc) |
| | | 166 | | { |
| | | 167 | | bsc.OldBatchSize = prevBatchStat!.BatchSize; |
| | | 168 | | bsc.DeltaBatch = currStat.BatchSize - prevBatchStat.BatchSize; |
| | | 169 | | bsc.DeltaThroughput = currStat.Throughput - prevBatchStat.Throughput; |
| | | 170 | | |
| | | 171 | | var betterBatchSize = currStat.Throughput > prevBatchStat!.Throughput ? currStat.BatchSize : prevBatchStat.B |
| | | 172 | | var scaledAdjustment = adjustmentSensitivity * bsc.DeltaThroughputPerDeltaBatch * bsc.AllowedBatchSizeRange; |
| | | 173 | | var setpoint = betterBatchSize + scaledAdjustment; |
| | | 174 | | |
| | | 175 | | if (Math.Abs(scaledAdjustment / bsc.AllowedBatchSizeRange) > maxAdjustmentFraction) |
| | | 176 | | { |
| | | 177 | | if (scaledAdjustment > 0) |
| | | 178 | | { |
| | | 179 | | setpoint = currStat.BatchSize + (bsc.AllowedBatchSizeRange * maxAdjustmentFraction); |
| | | 180 | | } |
| | | 181 | | else |
| | | 182 | | { |
| | | 183 | | setpoint = currStat.BatchSize - (bsc.AllowedBatchSizeRange * maxAdjustmentFraction); |
| | | 184 | | } |
| | | 185 | | } |
| | | 186 | | |
| | | 187 | | bsc.S1Calculated = setpoint; |
| | | 188 | | if (setpoint > maxQueryTimeSeconds * currStat.Throughput) |
| | | 189 | | { |
| | | 190 | | setpoint = maxQueryTimeSeconds * currStat.Throughput; |
| | | 191 | | } |
| | | 192 | | |
| | | 193 | | if (setpoint < 1) |
| | | 194 | | { |
| | | 195 | | setpoint = 1; |
| | | 196 | | } |
| | | 197 | | |
| | | 198 | | bsc.S2SetPoint = setpoint; |
| | | 199 | | |
| | | 200 | | |
| | | 201 | | var newBatchSize = _lowPassFilter.Next(setpoint); |
| | | 202 | | int rounded = (int)Math.Round(newBatchSize); |
| | | 203 | | if (rounded == currStat.BatchSize) |
| | | 204 | | { |
| | | 205 | | if (rounded >= currStat.BatchSize) |
| | | 206 | | newBatchSize = currStat.BatchSize + 1; |
| | | 207 | | else |
| | | 208 | | newBatchSize = currStat.BatchSize - 1; |
| | | 209 | | } |
| | | 210 | | |
| | | 211 | | bsc.LowPassBuffer = _lowPassFilter.GetBuffer(); |
| | | 212 | | bsc.S3DampenedSetPoint = newBatchSize; |
| | | 213 | | return bsc.S4Clamped; |
| | | 214 | | } |
| | | 215 | | |
| | | 216 | | private int EnsureChange(double input) |
| | | 217 | | { |
| | | 218 | | var larger = (int)Math.Round(input * 1.1) + 1; |
| | | 219 | | var smaller = (int)Math.Round(input * (1 / 1.1)) - 1; |
| | | 220 | | |
| | | 221 | | if (larger > maxBatchSize) |
| | | 222 | | { |
| | | 223 | | return smaller; |
| | | 224 | | } |
| | | 225 | | else if (smaller <= minBatchSize) |
| | | 226 | | { |
| | | 227 | | return larger; |
| | | 228 | | } |
| | | 229 | | else |
| | | 230 | | #pragma warning disable CA5394 // Do not use insecure randomness |
| | | 231 | | if (_rndm.NextDouble() >= .5) |
| | | 232 | | { |
| | | 233 | | #pragma warning disable CA5394 // Do not use insecure randomness |
| | | 234 | | return larger; |
| | | 235 | | } |
| | | 236 | | else |
| | | 237 | | { |
| | | 238 | | return smaller; |
| | | 239 | | } |
| | | 240 | | } |
| | | 241 | | } |
| | | 242 | | |
| | | 243 | | public delegate Task<List<T>> BatchAction<T>(List<T> batch); |
| | | 244 | | } |