< Summary

Information
Class: CounterpointCollective.Dataflow.IAsyncEnumerableExtensions
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/IAsyncEnumerableExtensions.cs
Line coverage
89%
Covered lines: 17
Uncovered lines: 2
Coverable lines: 19
Total lines: 74
Line coverage: 89.4%
Branch coverage
100%
Covered branches: 10
Total branches: 10
Branch coverage: 100%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
AsSourceBlock(...)100%11100%
AsSourceBlock(...)100%11100%
AsSourceBlock(...)100%210%
AsSourceBlock(...)100%11100%
FeedTo(...)100%210%
FeedTo()100%1010100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/IAsyncEnumerableExtensions.cs

#LineLine coverage
 1using System;
 2using System.Collections.Generic;
 3using System.Threading;
 4using System.Threading.Tasks;
 5using System.Threading.Tasks.Dataflow;
 6
 7namespace CounterpointCollective.Dataflow
 8{
 9    public static class IAsyncEnumerableExtensions
 10    {
 11        public static BufferBlock<I> AsSourceBlock<I>(this IAsyncEnumerable<I> source) =>
 205612            AsSourceBlock(source, true);
 13
 14        public static BufferBlock<I> AsSourceBlock<I>(
 15            this IAsyncEnumerable<I> source,
 16            bool complete
 205617        ) => AsSourceBlock(source, new DataflowBlockOptions(), complete);
 18
 19        public static BufferBlock<I> AsSourceBlock<I>(
 20            this IAsyncEnumerable<I> source,
 21            DataflowBlockOptions options
 022        ) => AsSourceBlock(source, options, true);
 23
 24        public static BufferBlock<I> AsSourceBlock<I>(
 25            this IAsyncEnumerable<I> source,
 26            DataflowBlockOptions options,
 27            bool complete
 28        )
 29        {
 205630            var bufferBlock = new BufferBlock<I>(options);
 205631            _ = source.FeedTo(bufferBlock, complete, options.CancellationToken);
 205632            return bufferBlock;
 33        }
 34
 35        public static Task FeedTo<T>(
 36            this IAsyncEnumerable<T> source,
 37            ITargetBlock<T> target,
 38            CancellationToken cancel = default
 039        ) => FeedTo(source, target, true, cancel);
 40
 41        public static async Task FeedTo<T>(
 42            this IAsyncEnumerable<T> source,
 43            ITargetBlock<T> target,
 44            bool complete,
 45            CancellationToken cancel = default
 46        )
 47        {
 48            try
 49            {
 26696350                await foreach (var e in source.WithCancellation(cancel))
 51                {
 13142652                    var consumed = await target.SendAsync(e, cancel);
 13142653                    if (cancel.IsCancellationRequested)
 54                    {
 55                        break;
 56                    }
 57
 13142658                    if (!consumed)
 59                    {
 160                        throw new ArgumentException("Target does not accept message");
 61                    }
 62                }
 205463                if (complete)
 64                {
 205465                    target.Complete();
 66                }
 205467            }
 268            catch (Exception e)
 69            {
 270                target.Fault(e);
 271            }
 205672        }
 73    }
 74}