< Summary

Information
Class: CounterpointCollective.Dataflow.GroupAdjacentBlock<T1, T2, T3>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/GroupAdjacentBlock.cs
Line coverage
98%
Covered lines: 60
Uncovered lines: 1
Coverable lines: 61
Total lines: 96
Line coverage: 98.3%
Branch coverage
100%
Covered branches: 14
Total branches: 14
Branch coverage: 100%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
get_Key()100%11100%
GetEnumerator()100%11100%
System.Collections.IEnumerable.GetEnumerator()100%210%
get_TargetSide()100%11100%
get_SourceSide()100%11100%
get_Count()100%11100%
.ctor(...)100%1414100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/GroupAdjacentBlock.cs

#LineLine coverage
 1using CounterpointCollective.Dataflow.Encapsulation;
 2using CounterpointCollective.Dataflow.Fluent;
 3using CounterpointCollective.Dataflow.Internal;
 4using System.Threading.Tasks.Dataflow;
 5
 6namespace CounterpointCollective.Dataflow
 7{
 8    public class GroupAdjacentBlock<T, K, V> : AbstractEncapsulatedPropagatorBlock<T, IGrouping<K, V>>
 9    {
 31610        private sealed class Grouping(K key, IEnumerable<V> values) : IGrouping<K, V>
 11        {
 31612            private readonly K _key = key;
 31613            private readonly IEnumerable<V> _values = [..values];
 14
 67715            public K Key => _key;
 62816            public IEnumerator<V> GetEnumerator() => _values.GetEnumerator();
 017            System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();
 18        }
 19
 20        private readonly BoundedPropagatorBlock<T, IGrouping<K, V>> _boundedPropagatorBlock;
 21
 22        /// <exclude/>
 38123        protected override ITargetBlock<T> TargetSide => _boundedPropagatorBlock;
 24
 25        /// <exclude/>
 40426        protected override ISourceBlock<IGrouping<K, V>> SourceSide => _boundedPropagatorBlock;
 27
 128        public int Count => _boundedPropagatorBlock.Count;
 29
 1430        public GroupAdjacentBlock(
 1431            Func<T, K> keySelector,
 1432            Func<T, V> valueSelector,
 1433            ExecutionDataflowBlockOptions options,
 1434            bool flushOnIdle = false
 1435        )
 36        {
 1437            var inputCount = 0;
 38
 1439            K currentKey = default!;
 1440            List<V> currentGroup = [];
 41
 1442            var outputBlock = new BufferBlock<IGrouping<K, V>>(new() { CancellationToken = options.CancellationToken });
 43
 44            void EmitCurrentGrouping()
 45            {
 46                var g = new Grouping(currentKey, currentGroup);
 47                outputBlock.PostAsserted(g);
 48                currentGroup.Clear();
 49            }
 50
 1451            var a =
 1452                new ActionBlock<T>(e =>
 1453                {
 36954                    var k = keySelector(e);
 36955                    var v = valueSelector(e);
 36956                    if (currentGroup.Count > 0 && !EqualityComparer<K>.Default.Equals(currentKey, k))
 1457                    {
 13558                        EmitCurrentGrouping();
 1459                    }
 36960                    currentKey = k;
 36961                    currentGroup.Add(v);
 36962                    if (flushOnIdle && Interlocked.Decrement(ref inputCount) == 0)
 1463                    {
 18064                        EmitCurrentGrouping();
 1465                    }
 38366                }, new() { CancellationToken = options.CancellationToken, SingleProducerConstrained = options.SingleProd
 67
 1468            a.Completion.ContinueWith(t =>
 1469            {
 1370                if (t.IsCompletedSuccessfully)
 1471                {
 772                    if (currentGroup.Count > 0)
 1473                    {
 174                        EmitCurrentGrouping();
 1475                    }
 776                    outputBlock.Complete();
 1477                }
 1478                else
 1479                {
 680                    currentGroup.Clear();
 681                    if (t.IsFaulted)
 1482                    {
 483                        ((IDataflowBlock)outputBlock).Fault(t.Exception!);
 1484                    }
 1485                }
 2086            });
 87
 1488            _boundedPropagatorBlock = new BoundedPropagatorBlock<T, IGrouping<K, V>>(
 1489                a,
 1490                outputBlock,
 1491                options.BoundedCapacity,
 36992                onEntering: () => Interlocked.Increment(ref inputCount)
 1493            );
 1494        }
 95    }
 96}