< Summary

Information
Class: CounterpointCollective.Dataflow.ParallelDataflowBlockExtensions
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/ParallelDataflowBlockExtensions.cs
Line coverage
90%
Covered lines: 29
Uncovered lines: 3
Coverable lines: 32
Total lines: 80
Line coverage: 90.6%
Branch coverage
90%
Covered branches: 9
Total branches: 10
Branch coverage: 90%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Par(...)100%44100%
Recombine()100%11100%
Par(...)100%11100%
Par(...)100%22100%
Par(...)100%11100%
RecombineByVerifyingLockstepOutput(...)75%5457.14%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/ParallelDataflowBlockExtensions.cs

#LineLine coverage
 1using CounterpointCollective.Dataflow.Fluent;
 2using CounterpointCollective.Utilities;
 3using System.Threading.Tasks.Dataflow;
 4
 5namespace CounterpointCollective.Dataflow
 6{
 7    public static class ParallelDataflowBlockExtensions
 8    {
 9        public static IPropagatorBlock<TInput, TOutput> Par<TInput, T1, T2, TOutput>(
 10            this IPropagatorBlock<TInput, T1> p1,
 11            IPropagatorBlock<TInput, T2> p2,
 12            Func<T1, T2, TOutput> f,
 13            GuaranteedBroadcastBlockOptions options
 14        )
 15        {
 116            var ret = new ParallelBlock<TInput, Either<T1, T2>, TOutput>(2, Recombine, options);
 17
 18            TOutput Recombine(Either<T1, T2>[] inputs)
 19            {
 1020                var idx = inputs[0].IsLeft ? 0 : 1;
 1021                var l = inputs[idx].FromLeft;
 1022                var r = inputs[(idx + 1) % 2].FromRight;
 1023                return f(l, r);
 24            }
 25
 126            var pt1 =
 127                p1.Pipeline()
 1028                .SynchronousTransform(e => Either<T1, T2>.Left(e))
 129                .Build();
 30
 131            var pt2 =
 132                p2.Pipeline()
 1033                .SynchronousTransform(e => Either<T1, T2>.Right(e))
 134                .Build();
 35
 136            ret.Hookup(pt1, new() { PropagateCompletion = true });
 137            ret.Hookup(pt2, new() { PropagateCompletion = true });
 38
 139            return ret;
 40        }
 41
 42        public static IPropagatorBlock<I, I> Par<I>(this IPropagatorBlock<I, I>[] blocks) =>
 243            blocks.Par(new());
 44
 45        public static IPropagatorBlock<I, I> Par<I>(
 46            this IPropagatorBlock<I, I>[] blocks,
 47            GuaranteedBroadcastBlockOptions options
 348        ) => Par(blocks, RecombineByVerifyingLockstepOutput, options);
 49
 50        public static ParallelBlock<I, T, O> Par<I, T, O>(
 51            this IPropagatorBlock<I, T>[] blocks,
 52            Func<T[], O> recombine,
 53            GuaranteedBroadcastBlockOptions options
 54        )
 55        {
 356            var res = new ParallelBlock<I, T, O>(
 357                blocks.Length,
 358                recombine,
 359                options
 360            );
 361            res.LinkWorkers(blocks, new() { PropagateCompletion = true });
 362            return res;
 63        }
 64
 65        private static I RecombineByVerifyingLockstepOutput<I>(I[] outputs)
 66        {
 1267            var first = outputs[0];
 8068            for (var i = 1; i < outputs.Length; i++)
 69            {
 2870                if (!EqualityComparer<I>.Default.Equals(first, outputs[i]))
 71                {
 072                    throw new ArgumentException(
 073                        "Not all outputs were the same. The order of the message probably changed."
 074                    );
 75                }
 76            }
 1277            return first;
 78        }
 79    }
 80}