< Summary

Information
Class: CounterpointCollective.Dataflow.ResizableBatchTransformBlock<T1, T2>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/ResizableBatchTransformBlock.cs
Line coverage
91%
Covered lines: 91
Uncovered lines: 8
Coverable lines: 99
Total lines: 192
Line coverage: 91.9%
Branch coverage
73%
Covered branches: 25
Total branches: 34
Branch coverage: 73.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_TargetSide()100%11100%
get_SourceSide()100%11100%
get_IsBottleneckCurrently()0%620%
get_IsFull()100%210%
get_BoundedCapacity()100%210%
set_BoundedCapacity(...)100%210%
get_BatchSize()100%11100%
set_BatchSize(...)50%22100%
get_OnBatch()100%11100%
get_Count()100%11100%
get_InputCount()100%11100%
get_InProgressCount()100%11100%
get_OutputCount()100%11100%
get_LongestWait()0%620%
.ctor(...)100%44100%
get_OnBatchSizeChanged()100%11100%
set_OnBatchSizeChanged(...)0%620%
RunBatchIfNeeded()100%11100%
RunBatchesAsync()100%44100%
TryGetBatch()100%1010100%
CanRunBatch()75%88100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/ResizableBatchTransformBlock.cs

#LineLine coverage
 1using CounterpointCollective.Dataflow.Encapsulation;
 2using CounterpointCollective.Dataflow.Internal;
 3using CounterpointCollective.Threading;
 4using System;
 5using System.Collections.Generic;
 6using System.Diagnostics.CodeAnalysis;
 7using System.Threading;
 8using System.Threading.Tasks;
 9using System.Threading.Tasks.Dataflow;
 10
 11namespace CounterpointCollective.Dataflow
 12{
 13#pragma warning disable CA1001 // Types that own disposable fields should be disposable
 14    public sealed class ResizableBatchTransformBlock<I, O>
 15#pragma warning restore CA1001 // Types that own disposable fields should be disposable
 16        : AbstractEncapsulatedPropagatorBlock<I, O>, IResizablePropagatorBlock<I, O>
 17    {
 1016018        protected override ITargetBlock<I> TargetSide => _boundedPropagatorBlock;
 19
 2099820        protected override ISourceBlock<O> SourceSide => _boundedPropagatorBlock;
 21
 22        private readonly SemaphoreSlim _semRunNextBatch;
 23
 24        private readonly BoundedPropagatorBlock<I,O> _boundedPropagatorBlock;
 25        private readonly ResizableBufferBlock<I> _batchGatherBlock;
 26        private readonly TransformManyBlock<I[], O> _transformManyBlock;
 27
 28        private DateTime earliestEntrance;
 29
 030        public bool IsBottleneckCurrently => OutputCount == 0 && IsFull;
 31
 032        public bool IsFull => _batchGatherBlock.Count >= BatchSize;
 33
 34        public int BoundedCapacity
 35        {
 036            get => _boundedPropagatorBlock.BoundedCapacity;
 037            set => _boundedPropagatorBlock.BoundedCapacity = value;
 38        }
 39
 40
 41        public int BatchSize
 42        {
 507243            get => _batchGatherBlock.BoundedCapacity;
 44            set
 45            {
 12646                _batchGatherBlock.BoundedCapacity = value;
 12647                OnBatchSizeChanged?.Invoke(value);
 12648                RunBatchIfNeeded();
 12649            }
 50        }
 51
 12952        public Func<ResizableBatchTransformBlock<I, O>, IList<I>, IDisposable>? OnBatch { get; set; }
 53
 454        public int Count => _boundedPropagatorBlock.Count;
 55
 56        private int inputCount;
 757        public int InputCount => Volatile.Read(ref inputCount);
 58
 159        public int InProgressCount => Count - InputCount - OutputCount;
 60
 1461        public int OutputCount => _transformManyBlock.OutputCount;
 62
 063        public TimeSpan LongestWait => InputCount == 0 ? TimeSpan.Zero : DateTime.UtcNow - earliestEntrance;
 64
 1065        private readonly AsyncAutoResetEventSlim _sem = new(false);
 66
 67        public Action<int>? OnBatchSizeChanged
 68        {
 12669            get;
 70            set
 71            {
 072                field = value;
 073                field?.Invoke(BatchSize);
 074            }
 75        }
 76
 1077        public ResizableBatchTransformBlock(
 1078            Func<I[], Task<IEnumerable<O>>> transform,
 1079            int initialBatchSize,
 1080            ExecutionDataflowBlockOptions? options = null
 1081        )
 82        {
 1083            options ??= new();
 1084            _batchGatherBlock = new(new() { CancellationToken = options.CancellationToken }, RunBatchIfNeeded);
 85
 1086            _semRunNextBatch = new SemaphoreSlim(options.MaxDegreeOfParallelism - 1);
 1087            _transformManyBlock = new TransformManyBlock<I[], O>(
 1088                async batch =>
 1089                {
 12890                    var d = OnBatch?.Invoke(this, batch);
 1091                    IEnumerable<O> ret;
 1092                    try
 1093                    {
 12894                        ret = await transform(batch);
 12795                    }
 1096                    finally
 1097                    {
 12898                        d?.Dispose();
 1099                    }
 127100                    _semRunNextBatch.Release();
 127101                    return ret;
 127102                },
 10103                new() { CancellationToken = options.CancellationToken, MaxDegreeOfParallelism = options.MaxDegreeOfParal
 10104            );
 105
 10106            BatchSize = initialBatchSize;
 107
 10108            _boundedPropagatorBlock = new BoundedPropagatorBlock<I, O>(
 10109                _batchGatherBlock,
 10110                _transformManyBlock,
 10111                options.BoundedCapacity,
 10112                onEntered: () =>
 10113                {
 10155114                    if (Interlocked.Increment(ref inputCount) == 1)
 10115                    {
 12116                        earliestEntrance = DateTime.UtcNow;
 10117                    }
 10155118                }
 10119            );
 120
 10121            Task.Run(async () =>
 10122            {
 20123                var runBatchesTask = Task.Run(() => RunBatchesAsync());
 10124                var t = await Task.WhenAny(_boundedPropagatorBlock.InputCompletion);
 5125                RunBatchIfNeeded();
 5126                await _batchGatherBlock.Completion;
 4127                _sem.Terminate();
 4128                await runBatchesTask;
 10129
 4130                _ = t.PropagateCompletion(_transformManyBlock);
 14131            });
 10132        }
 133
 134
 10198135        private void RunBatchIfNeeded() => _sem.Set();
 136
 137        private async Task RunBatchesAsync()
 138        {
 139            while (true)
 140            {
 174141                var t = await Task.WhenAny(_sem.WaitOneAsync());
 170142                if (!t.IsCompletedSuccessfully)
 143                {
 144                    break;
 145                }
 146
 293147                while (TryGetBatch(out var batch))
 148                {
 129149                    Interlocked.Add(ref inputCount, -batch.Count);
 129150                    _transformManyBlock.PostAsserted([.. batch]);
 128151                    await _semRunNextBatch.WaitAsync();
 127152                }
 153            }
 154
 155            bool TryGetBatch([MaybeNullWhen(false)] out IList<I> batch)
 156            {
 293157                if (CanRunBatch())
 158                {
 159                    //First try to get maximally BatchSize items synchronously.
 160                    //The buffer may be overfull though, in case we resized to a smaller BatchSize, so we need to check 
 129161                    if (!(_batchGatherBlock.Count <= BatchSize && _batchGatherBlock.TryReceiveAll(out batch)))
 162                    {
 163                        //Fall back to slow mode.
 51164                        batch = [];
 4647165                        while (batch.Count < BatchSize && _batchGatherBlock.TryReceive(out var item))
 166                        {
 4596167                            batch.Add(item);
 4596168                        }
 169                    }
 129170                    return true;
 171                }
 164172                batch = null;
 164173                return false;
 174            }
 175
 176            // Normal             -> buffer.Count >= BatchSize
 177            // Faulting/canceling -> false
 178            // Complete           -> buffer.Count > 0
 179            bool CanRunBatch()
 180            {
 293181                var finishedUnsuccessful = Completion.IsFaulted || Completion.IsCanceled;
 293182                return
 293183                    !finishedUnsuccessful
 293184                    &&
 293185                    (
 293186                        _batchGatherBlock.Count >= BatchSize
 293187                        || (_boundedPropagatorBlock.InputCompletion.IsCompletedSuccessfully && _batchGatherBlock.Count >
 293188                    );
 189            }
 4190        }
 191    }
 192}