< Summary

Information
Class: CounterpointCollective.Dataflow.Fluent.PipelineBuilderExtensions
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/Fluent/PipelineBuilderExtensions.cs
Line coverage
63%
Covered lines: 12
Uncovered lines: 7
Coverable lines: 19
Total lines: 132
Line coverage: 63.1%
Branch coverage
25%
Covered branches: 3
Total branches: 12
Branch coverage: 25%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Choice(...)50%22100%
GroupAdjacent(...)0%620%
GroupAdjacent(...)100%210%
Tee(...)100%22100%
Parallel(...)0%620%
Parallel(...)0%620%
ResizableBuffer(...)0%620%
SynchronousFilter(...)100%11100%
SynchronousTransform(...)100%11100%
WithNotification(...)100%11100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/Fluent/PipelineBuilderExtensions.cs

#LineLine coverage
 1using CounterpointCollective.Dataflow.Fluent;
 2using CounterpointCollective.Dataflow.Notifying;
 3using System.Threading.Tasks.Dataflow;
 4
 5namespace CounterpointCollective.Dataflow.Fluent
 6{
 7    public static class PipelineBuilderExtensions
 8    {
 9        /// <summary>
 10        /// Links an choice block to the end of the current IPipelineBuilder.
 11        /// </summary>
 12        public static IPipelineBuilder<TFirst, ChoiceBlock<T, O>> Choice<TFirst, T, O>(
 13            this IPipelineBuilder<TFirst, ISourceBlock<T>> p,
 14            Predicate<T> predicate,
 15            IPropagatorBlock<T, O> thenBlock,
 16            IPropagatorBlock<T, O> elseBlock,
 17            ExecutionDataflowBlockOptions? options = null
 18        ) where TFirst : IDataflowBlock
 19        {
 120            var choiceBlock =
 121                new ChoiceBlock<T, O>(predicate, thenBlock, elseBlock, options ?? new());
 122            return p.LinkTo(choiceBlock, new DataflowLinkOptions() { PropagateCompletion = true });
 23        }
 24
 25        /// <summary>
 26        /// Links an GroupAdjacentBlock to the end of the current IPipelineBuilder.
 27        /// </summary>
 28        public static IPipelineBuilder<TFirst, GroupAdjacentBlock<T, TKey, T>> GroupAdjacent<TFirst, T, TKey>(
 29            this IPipelineBuilder<TFirst, ISourceBlock<T>> p,
 30            Func<T, TKey> keySelector,
 31            ExecutionDataflowBlockOptions options,
 32            bool flushOnIdle = false
 33        ) where TFirst : IDataflowBlock
 034        => GroupAdjacent(p, keySelector, e => e, options, flushOnIdle);
 35
 36        /// <summary>
 37        /// Links an GroupAdjacentBlock to the end of the current IPipelineBuilder.
 38        /// </summary>
 39        public static IPipelineBuilder<TFirst, GroupAdjacentBlock<T, TKey, TValue>> GroupAdjacent<TFirst, T, TKey, TValu
 40            this IPipelineBuilder<TFirst, ISourceBlock<T>> p,
 41            Func<T, TKey> keySelector,
 42            Func<T, TValue> valueSelector,
 43            ExecutionDataflowBlockOptions options,
 44            bool flushOnIdle = false
 45        ) where TFirst : IDataflowBlock
 046        => p.LinkTo(new GroupAdjacentBlock<T, TKey, TValue>(keySelector, valueSelector, options, flushOnIdle), new Dataf
 47
 48        /// <summary>
 49        /// Links an TeeBlock to the end of the current IPipelineBuilder.
 50        /// </summary>
 51        public static IPipelineBuilder<TFirst, ISourceBlock<O>> Tee<TFirst, I, T, O>(
 52            this IPipelineBuilder<TFirst, ISourceBlock<I>> p,
 53            IPropagatorBlock<I, T> inner,
 54            Func<I, T, O> combinator,
 55            DataflowBlockOptions? options = null
 56        ) where TFirst : IDataflowBlock
 57        {
 158            var t = new TeeBlock<I, T, O>(
 159                inner,
 160                combinator,
 161                options ?? new DataflowBlockOptions()
 162            );
 163            return p.LinkTo(t, new() { PropagateCompletion = true });
 64        }
 65
 66        /// <summary>
 67        /// Links a  ParallelBlock to the end of the current IPipelineBuilder.
 68        /// </summary>
 69        public static IPipelineBuilder<TFirst, ISourceBlock<TOutput>> Parallel<TFirst, TInput, T1, T2, TOutput>(
 70            this IPipelineBuilder<TFirst, ISourceBlock<TInput>> p,
 71            IPropagatorBlock<TInput, T1> block1,
 72            IPropagatorBlock<TInput, T2> block2,
 73            Func<T1, T2, TOutput> f,
 74            GuaranteedBroadcastBlockOptions? options = null
 75        ) where TFirst : IDataflowBlock
 76        {
 077            var inner = block1.Par(block2, f, options ?? new());
 078            return p.LinkTo(inner, new() { PropagateCompletion = true });
 79        }
 80
 81        /// <summary>
 82        /// Links a  ParallelBlock to the end of the current IPipelineBuilder.
 83        /// </summary>
 84        public static IPipelineBuilder<TFirst, ISourceBlock<TOutput>> Parallel<TFirst, TOutput>(
 85            this IPipelineBuilder<TFirst, ISourceBlock<TOutput>> p,
 86            IPropagatorBlock<TOutput, TOutput>[] blocks,
 87            GuaranteedBroadcastBlockOptions? options = null
 88        ) where TFirst : IDataflowBlock
 89        {
 090            var inner = blocks.Par(options ?? new());
 091            return p.LinkTo(inner, new() { PropagateCompletion = true });
 92        }
 93
 94        /// <summary>
 95        /// Links a ResizableBuffer to the end of the current IPipelineBuilder.
 96        /// </summary>
 97        public static IPipelineBuilder<TFirst, ResizableBufferBlock<TOutput>> ResizableBuffer<TFirst, TOutput>(
 98            this IPipelineBuilder<TFirst, ISourceBlock<TOutput>> p,
 99            DataflowBlockOptions? options = null,
 100            Action? onEntered = null
 101        ) where TFirst : IDataflowBlock
 0102        => p.LinkTo(new ResizableBufferBlock<TOutput>(options ?? new(), onEntered), new() { PropagateCompletion = true }
 103
 104        /// <summary>
 105        /// Wraps the last block of the current IPipelineBuilder in a SynchronousFilterBlock.
 106        /// </summary>
 107        public static IPipelineBuilder<TFirst, SynchronousFilterBlock<T>> SynchronousFilter<TFirst, T>(
 108            this IPipelineBuilder<TFirst, ISourceBlock<T>> p,
 109            Predicate<T> predicate,
 110            FilterMode mode = FilterMode.Block
 111        ) where TFirst : IDataflowBlock
 52112        => p.SelectLast(e => new SynchronousFilterBlock<T>(e, predicate, mode));
 113
 114        /// <summary>
 115        /// Wraps the last block of the current IPipelineBuilder in a SynchronousTransformingBlock.
 116        /// </summary>
 117        public static IPipelineBuilder<TFirst, SynchronousTransformingBlock<T, O>> SynchronousTransform<TFirst, T, O>(
 118            this IPipelineBuilder<TFirst, ISourceBlock<T>> p,
 119            Func<T, O> transform
 120        ) where TFirst : IDataflowBlock
 56121        => p.SelectLast(e => new SynchronousTransformingBlock<T, O>(e, transform));
 122
 123        /// <summary>
 124        /// Wraps the last block of the current IPipelineBuilder in a SourceBlockWithDeliveryNotification.
 125        /// </summary>
 126        public static IPipelineBuilder<TFirst, NotifyingSourceBlock<T>> WithNotification<TFirst, T>(
 127            this IPipelineBuilder<TFirst, ISourceBlock<T>> p,
 128            ConfigureHooks<T> hooks
 129        ) where TFirst : IDataflowBlock
 112130        => p.SelectLast(l => l.WithNotification(hooks));
 131    }
 132}

Methods/Properties

Choice(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<T>>,System.Predicate`1<T>,System.Threading.Tasks.Dataflow.IPropagatorBlock`2<T,O>,System.Threading.Tasks.Dataflow.IPropagatorBlock`2<T,O>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
GroupAdjacent(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<T>>,System.Func`2<T,TKey>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions,System.Boolean)
GroupAdjacent(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<T>>,System.Func`2<T,TKey>,System.Func`2<T,TValue>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions,System.Boolean)
Tee(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<I>>,System.Threading.Tasks.Dataflow.IPropagatorBlock`2<I,T>,System.Func`3<I,T,O>,System.Threading.Tasks.Dataflow.DataflowBlockOptions)
Parallel(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<TInput>>,System.Threading.Tasks.Dataflow.IPropagatorBlock`2<TInput,T1>,System.Threading.Tasks.Dataflow.IPropagatorBlock`2<TInput,T2>,System.Func`3<T1,T2,TOutput>,CounterpointCollective.Dataflow.GuaranteedBroadcastBlockOptions)
Parallel(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<TOutput>>,System.Threading.Tasks.Dataflow.IPropagatorBlock`2<TOutput,TOutput>[],CounterpointCollective.Dataflow.GuaranteedBroadcastBlockOptions)
ResizableBuffer(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<TOutput>>,System.Threading.Tasks.Dataflow.DataflowBlockOptions,System.Action)
SynchronousFilter(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<T>>,System.Predicate`1<T>,CounterpointCollective.Dataflow.FilterMode)
SynchronousTransform(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<T>>,System.Func`2<T,O>)
WithNotification(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<T>>,CounterpointCollective.Dataflow.Notifying.ConfigureHooks`1<T>)