< Summary

Information
Class: CounterpointCollective.Dataflow.Fluent.PipelineBuilderExtensions
Assembly: Dataflow.Fluent
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Fluent/Dataflow/Fluent/PipelineBuilderExtensions.cs
Line coverage
47%
Covered lines: 26
Uncovered lines: 29
Coverable lines: 55
Total lines: 285
Line coverage: 47.2%
Branch coverage
29%
Covered branches: 7
Total branches: 24
Branch coverage: 29.1%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Pipeline(...)100%11100%
Build(...)50%4475%
BuildSourceBlock(...)50%2275%
BuildTargetBlock(...)50%2275%
BuildDataflowBlock(...)0%620%
Select(...)100%210%
SelectLast(...)100%11100%
SelectFirst(...)100%210%
LinkTo(...)100%11100%
LinkTo(...)100%210%
SubPipeline(...)100%11100%
Buffer(...)0%620%
Transform(...)100%22100%
TransformMany(...)50%22100%
TransformMany(...)0%620%
TransformMany(...)0%620%
Batch(...)0%620%
Action(...)0%620%
.ctor(...)100%11100%
get_First()100%11100%
get_Last()100%11100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Fluent/Dataflow/Fluent/PipelineBuilderExtensions.cs

#LineLine coverage
 1using CounterpointCollective.Dataflow.Encapsulation;
 2using System.Threading.Tasks.Dataflow;
 3
 4namespace CounterpointCollective.Dataflow.Fluent
 5{
 6    public static class PipelineBuilderExtensions
 7    {
 8        /// <summary>
 9        /// Create a IPipelineBuilder wrapping the given dataflow block.
 10        /// </summary>
 11        /// <typeparam name="TBlock">The type of the dataflow block.</typeparam>
 12        /// <param name="block">The dataflow block wrap by the IPipelineBuilder.</param>
 13        /// <returns>
 14        /// A new <see cref="IPipelineBuilder{TBlock, TBlock}"/> representing the pipeline
 15        /// starting and ending with <paramref name="block"/>. This allows additional blocks to be appended
 16        /// fluently to build a larger pipeline, e.g. using methods like <c>Transform</c> or <c>Tee</c>.
 17        /// </returns>
 18        public static
 19            IPipelineBuilder<TBlock, TBlock>
 20            Pipeline<TBlock>(this TBlock block) where TBlock : IDataflowBlock
 12321            => new PipelineBuilder<TBlock, TBlock>(block, block);
 22
 23        #region builders
 24        /// <summary>
 25        /// Builds the pipeline as a single <see cref="IPropagatorBlock{TInput,TOutput}"/>
 26        /// </summary>
 27        /// <returns>A block that encapsulates the entire pipeline.</returns>
 28        public static IPropagatorBlock<TInput, TOutput> Build<TInput, TOutput>(
 29            this IPipelineBuilder<ITargetBlock<TInput>, ISourceBlock<TOutput>> b
 30        )
 31        {
 6932            if (b.Last == b.First && b.First is IPropagatorBlock<TInput, TOutput> p)
 33            {
 034                return p;
 35            }
 36            else
 37            {
 6938                p = DataflowBlock.Encapsulate(b.First, b.Last);
 6939                return p;
 40            }
 41        }
 42
 43        /// <summary>
 44        /// Builds the encapsulated pipeline as a single <see cref="ISourceBlock{TOutput}"/>
 45        /// </summary>
 46        /// <returns>A block that encapsulates the entire pipeline.</returns>
 47        public static ISourceBlock<TOutput> BuildSourceBlock<TOutput>(
 48            this IPipelineBuilder<IDataflowBlock, ISourceBlock<TOutput>> b
 49        )
 50        {
 151            if (b.Last == b.First)
 52            {
 053                return b.Last;
 54            }
 55            else
 56            {
 157                var p = b.First.EncapsulateAsSourceBlock(b.Last);
 158                return p;
 59            }
 60        }
 61
 62        /// <summary>
 63        /// Builds the pipeline as a single <see cref="ITargetBlock{TInput}"/>
 64        /// </summary>
 65        /// <returns>A block that encapsulates the entire pipeline.</returns>
 66        public static ITargetBlock<TInput> BuildTargetBlock<TInput>(
 67            this IPipelineBuilder<ITargetBlock<TInput>, IDataflowBlock> b
 68        )
 69        {
 170            if (b.Last == b.First)
 71            {
 072                return b.First;
 73            }
 74            else
 75            {
 176                var p = b.First.EncapsulateAsTargetBlock(b.Last);
 177                return p;
 78            }
 79        }
 80
 81        /// <summary>
 82        /// Builds the pipeline as a single <see cref="IDataflowBlock"/>
 83        /// </summary>
 84        /// <returns>A block that encapsulates the entire pipeline.</returns>
 85        public static IDataflowBlock BuildDataflowBlock<TInput>(
 86            this IPipelineBuilder<IDataflowBlock, IDataflowBlock> b
 87        )
 88        {
 089            if (b.Last == b.First)
 90            {
 091                return b.First;
 92            }
 93            else
 94            {
 095                var p = b.First.EncapsulateAsDataflowBlock(b.Last);
 096                return p;
 97            }
 98        }
 99        #endregion builders
 100
 101        #region selectors
 102        public static IPipelineBuilder<TNewFirst, TNewLast> Select<TFirst, TLast, TNewFirst, TNewLast>
 103            (
 104            this IPipelineBuilder<TFirst, TLast> p,
 105            Func<
 106                IPipelineBuilder<TFirst, TLast>,
 107                IPipelineBuilder<TNewFirst, TNewLast>
 108                > f)
 109        where TFirst : IDataflowBlock
 110        where TLast : IDataflowBlock
 111        where TNewFirst : IDataflowBlock
 112        where TNewLast : IDataflowBlock
 0113            => f(p);
 114
 115        public static IPipelineBuilder<TFirst, TNew> SelectLast<TFirst, TLast, TNew>(
 116            this IPipelineBuilder<TFirst, TLast> p, Func<TLast, TNew> f)
 117        where TFirst : IDataflowBlock
 118        where TLast : IDataflowBlock
 119        where TNew : IDataflowBlock
 228120            => new PipelineBuilder<TFirst, TNew>(p.First, f(p.Last));
 121
 122        public static IPipelineBuilder<TNew, TLast> SelectFirst<TFirst, TLast, TNew>(
 123            this IPipelineBuilder<TFirst, TLast> p, Func<TFirst, TNew> f)
 124        where TFirst : IDataflowBlock
 125        where TLast : IDataflowBlock
 126        where TNew : IDataflowBlock
 0127            => new PipelineBuilder<TNew, TLast>(f(p.First), p.Last);
 128        #endregion selectors
 129
 130        public static IPipelineBuilder<TFirst, TNewLast> LinkTo<TFirst, TNewLast, T>
 131            (this IPipelineBuilder<TFirst, ISourceBlock<T>> p, TNewLast nl, DataflowLinkOptions options)
 132            where TNewLast : ITargetBlock<T>
 133            where TFirst : IDataflowBlock
 134        =>
 104135            p.SelectLast(e =>
 104136            {
 104137                e.LinkTo(nl, options);
 104138                return nl;
 104139            });
 140
 141
 142        public static IPipelineBuilder<TFirst, TNewLast> LinkTo<TFirst, TNewLast, T>
 143            (this IPipelineBuilder<TFirst, ISourceBlock<T>> p, TNewLast nl, DataflowLinkOptions options, Predicate<T> pr
 144            where TNewLast : ITargetBlock<T>
 145            where TFirst : IDataflowBlock
 146        =>
 0147            p.SelectLast(e =>
 0148            {
 0149                e.LinkTo(nl, options, predicate);
 0150                return nl;
 0151            });
 152
 153        /// <summary>
 154        /// Builds a sub-pipeline starting from the current last block and replaces the last
 155        /// block with the result of that sub-pipeline.
 156        /// </summary>
 157        /// <typeparam name="TFirst">The first block type of the outer pipeline.</typeparam>
 158        /// <typeparam name="TLast">The current last block type, used as the start of the sub-pipeline.</typeparam>
 159        /// <typeparam name="TNewLast">The resulting last block type after the sub-pipeline.</typeparam>
 160        /// <param name="p">The current pipeline builder.</param>
 161        /// <param name="build">
 162        /// A function that receives a sub-pipeline starting at <typeparamref name="TLast"/>
 163        /// and returns its final block.
 164        /// </param>
 165        /// <returns>
 166        /// A new pipeline builder whose last block is the result of the sub-pipeline.
 167        /// </returns>
 168        public static IPipelineBuilder<TFirst, TNewLast>
 169            SubPipeline<TFirst, TLast, TNewLast>
 170            (this IPipelineBuilder<TFirst, TLast> p,
 171            Func<IPipelineBuilder<TLast, TLast>, TNewLast> build
 172        )
 173            where TFirst : IDataflowBlock
 174            where TLast : IDataflowBlock
 175            where TNewLast : IDataflowBlock
 2176        => p.SelectLast(e => build(e.Pipeline()));
 177
 178        #region convenience methods
 179        /// <summary>
 180        /// Links a BufferBlock to the end of the current IPipelineBuilder.
 181        /// </summary>
 182        /// <typeparam name="TFirst">The first block type.</typeparam>
 183        /// <typeparam name="T">The type of items in the buffer.</typeparam>
 184        /// <param name="p">The pipeline builder.</param>
 185        /// <param name="options">Optional DataflowBlockOptions.</param>
 186        /// <returns>A pipeline builder ending with a BufferBlock of T.</returns>
 187        public static IPipelineBuilder<TFirst, BufferBlock<T>> Buffer<TFirst, T>
 188        (
 189            this IPipelineBuilder<TFirst, ISourceBlock<T>> p,
 190            DataflowBlockOptions? options = null
 191        ) where TFirst : IDataflowBlock
 0192        => p.LinkTo(
 0193            new BufferBlock<T>(options ?? new()), new DataflowLinkOptions() { PropagateCompletion = true }
 0194        );
 195
 196        /// <summary>
 197        /// Links a TransformBlock to the end of the current IPipelineBuilder.
 198        /// </summary>
 199        public static IPipelineBuilder<TFirst, TransformBlock<I, O>> Transform<TFirst, I, O>
 200        (
 201            this IPipelineBuilder<TFirst, ISourceBlock<I>> p,
 202            Func<I, O> f,
 203            ExecutionDataflowBlockOptions? options = null
 204        ) where TFirst : IDataflowBlock
 10205        => p.LinkTo(
 10206            new TransformBlock<I, O>(f, options ?? new()), new DataflowLinkOptions() { PropagateCompletion = true }
 10207        );
 208
 209        /// <summary>
 210        /// Links a TransformManyBlock to the end of the current IPipelineBuilder.
 211        /// </summary>
 212        public static IPipelineBuilder<TFirst, TransformManyBlock<I, O>> TransformMany<TFirst, I, O>
 213        (
 214            this IPipelineBuilder<TFirst, ISourceBlock<I>> p,
 215            Func<I, IEnumerable<O>> f,
 216            ExecutionDataflowBlockOptions? options = null
 217        ) where TFirst : IDataflowBlock
 12218        => p.LinkTo(
 12219            new TransformManyBlock<I, O>(f, options ?? new()), new DataflowLinkOptions() { PropagateCompletion = true }
 12220        );
 221
 222        /// <summary>
 223        /// Links a TransformManyBlock to the end of the current IPipelineBuilder.
 224        /// </summary>
 225        public static IPipelineBuilder<TFirst, TransformManyBlock<I, O>> TransformMany<TFirst, I, O>
 226        (
 227            this IPipelineBuilder<TFirst, ISourceBlock<I>> p,
 228            Func<I, Task<IEnumerable<O>>> f,
 229            ExecutionDataflowBlockOptions? options = null
 230        ) where TFirst : IDataflowBlock
 0231        => p.LinkTo(
 0232            new TransformManyBlock<I, O>(f, options ?? new()), new DataflowLinkOptions() { PropagateCompletion = true }
 0233        );
 234
 235        /// <summary>
 236        /// Links a TransformManyBlock to the end of the current IPipelineBuilder.
 237        /// </summary>
 238        public static IPipelineBuilder<TFirst, TransformManyBlock<I, O>> TransformMany<TFirst, I, O>
 239        (
 240            this IPipelineBuilder<TFirst, ISourceBlock<I>> p,
 241            Func<I, IAsyncEnumerable<O>> f,
 242            ExecutionDataflowBlockOptions? options = null
 243        ) where TFirst : IDataflowBlock
 0244        => p.LinkTo(
 0245            new TransformManyBlock<I, O>(f, options ?? new()), new DataflowLinkOptions() { PropagateCompletion = true }
 0246        );
 247
 248        /// <summary>
 249        /// Links a BatchBlock to the end of the current IPipelineBuilder.
 250        /// </summary>
 251        public static IPipelineBuilder<TFirst, BatchBlock<T>> Batch<TFirst, T>
 252        (
 253            this IPipelineBuilder<TFirst, ISourceBlock<T>> p,
 254            int batchSize,
 255            GroupingDataflowBlockOptions? options = null
 256        ) where TFirst : IDataflowBlock
 0257        => p.LinkTo(
 0258            new BatchBlock<T>(batchSize, options ?? new()), new DataflowLinkOptions() { PropagateCompletion = true }
 0259        );
 260
 261
 262        /// <summary>
 263        /// Links an ActionBlock to the end of the current IPipelineBuilder.
 264        /// </summary>
 265        public static IPipelineBuilder<TFirst, ActionBlock<T>> Action<TFirst, T>
 266        (
 267            this IPipelineBuilder<TFirst, ISourceBlock<T>> p,
 268            Action<T> a,
 269            ExecutionDataflowBlockOptions? options = null
 270        ) where TFirst : IDataflowBlock
 0271        => p.LinkTo(
 0272            new ActionBlock<T>(a, options ?? new()), new DataflowLinkOptions() { PropagateCompletion = true }
 0273        );
 274
 275        #endregion convenience methods
 276
 351277        private sealed class PipelineBuilder<TFirst, TLast>(TFirst first, TLast last) : IPipelineBuilder<TFirst, TLast>
 278        where TFirst : IDataflowBlock
 279        where TLast : IDataflowBlock
 280        {
 721281            public TFirst First { get; } = first;
 721282            public TLast Last { get; } = last;
 283        }
 284    }
 285}

Methods/Properties

Pipeline(TBlock)
Build(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<System.Threading.Tasks.Dataflow.ITargetBlock`1<TInput>,System.Threading.Tasks.Dataflow.ISourceBlock`1<TOutput>>)
BuildSourceBlock(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<System.Threading.Tasks.Dataflow.IDataflowBlock,System.Threading.Tasks.Dataflow.ISourceBlock`1<TOutput>>)
BuildTargetBlock(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<System.Threading.Tasks.Dataflow.ITargetBlock`1<TInput>,System.Threading.Tasks.Dataflow.IDataflowBlock>)
BuildDataflowBlock(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<System.Threading.Tasks.Dataflow.IDataflowBlock,System.Threading.Tasks.Dataflow.IDataflowBlock>)
Select(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,TLast>,System.Func`2<CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,TLast>,CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TNewFirst,TNewLast>>)
SelectLast(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,TLast>,System.Func`2<TLast,TNew>)
SelectFirst(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,TLast>,System.Func`2<TFirst,TNew>)
LinkTo(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<T>>,TNewLast,System.Threading.Tasks.Dataflow.DataflowLinkOptions)
LinkTo(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<T>>,TNewLast,System.Threading.Tasks.Dataflow.DataflowLinkOptions,System.Predicate`1<T>)
SubPipeline(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,TLast>,System.Func`2<CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TLast,TLast>,TNewLast>)
Buffer(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<T>>,System.Threading.Tasks.Dataflow.DataflowBlockOptions)
Transform(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<I>>,System.Func`2<I,O>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
TransformMany(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<I>>,System.Func`2<I,System.Collections.Generic.IEnumerable`1<O>>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
TransformMany(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<I>>,System.Func`2<I,System.Threading.Tasks.Task`1<System.Collections.Generic.IEnumerable`1<O>>>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
TransformMany(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<I>>,System.Func`2<I,System.Collections.Generic.IAsyncEnumerable`1<O>>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
Batch(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<T>>,System.Int32,System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions)
Action(CounterpointCollective.Dataflow.Fluent.IPipelineBuilder`2<TFirst,System.Threading.Tasks.Dataflow.ISourceBlock`1<T>>,System.Action`1<T>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
.ctor(TFirst,TLast)
get_First()
get_Last()