< Summary

Information
Class: CounterpointCollective.Dataflow.ISourceBlockExtensions
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/ISourceBlockExtensions.cs
Line coverage
61%
Covered lines: 80
Uncovered lines: 50
Coverable lines: 130
Total lines: 384
Line coverage: 61.5%
Branch coverage
78%
Covered branches: 64
Total branches: 82
Branch coverage: 78%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Filter(...)100%210%
Transform(...)100%210%
Transform(...)100%210%
Transform(...)100%11100%
Transform(...)100%210%
Flatten(...)83.33%66100%
TransformMany(...)0%620%
TransformMany(...)100%210%
TransformMany(...)100%210%
TransformMany(...)100%210%
GroupAdjacent(...)100%11100%
Batch(...)100%11100%
Batch(...)100%11100%
Buffer(...)100%11100%
Buffer(...)100%11100%
Action(...)100%210%
Action(...)100%210%
Action(...)100%210%
Action(...)100%11100%
BroadcastTo(...)100%210%
BroadcastTo(...)0%620%
GuaranteedBroadcastTo(...)100%210%
GuaranteedBroadcastTo(...)0%620%
WithBuffer(...)100%210%
Finally()94.73%1919100%
ToListAsync()100%11100%
AsAsyncEnumerable(...)83.33%66100%
UnwrapAggregateAsync()100%1183.33%
Fast()82.35%3434100%
Slow()72.72%1111100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/ISourceBlockExtensions.cs

#LineLine coverage
 1using CounterpointCollective.Dataflow.Internal;
 2using System.Diagnostics;
 3using System.Runtime.ExceptionServices;
 4using System.Threading.Tasks;
 5using System.Threading.Tasks.Dataflow;
 6
 7namespace CounterpointCollective.Dataflow
 8{
 9    /// <exclude />
 10    public static class ISourceBlockExtensions
 11    {
 12        public static SynchronousFilterBlock<T> Filter<T>(
 13            this ISourceBlock<T> source,
 14            Predicate<T> pred,
 15            FilterMode mode = FilterMode.Block
 16        )
 017        => new(source, pred, mode);
 18
 19        public static ISourceBlock<O> Transform<I, O>(this ISourceBlock<I> source, Func<I, O> f) =>
 020            Transform(source, f, new());
 21
 22        public static ISourceBlock<O> Transform<I, O>(
 23            this ISourceBlock<I> source,
 24            Func<I, Task<O>> f
 025        ) => Transform(source, f, new());
 26
 27        public static TransformBlock<I,O> Transform<I, O>(
 28            this ISourceBlock<I> source,
 29            Func<I, O> f,
 30            ExecutionDataflowBlockOptions opts
 31        )
 32        {
 133            var t = new TransformBlock<I, O>(f, opts);
 134            _ = source.LinkTo(t, new DataflowLinkOptions() { PropagateCompletion = true });
 135            return t;
 36        }
 37
 38        public static ISourceBlock<O> Transform<I, O>(
 39            this ISourceBlock<I> source,
 40            Func<I, Task<O>> f,
 41            ExecutionDataflowBlockOptions opts
 42        )
 43        {
 044            var t = new TransformBlock<I, O>(f, opts);
 045            _ = source.LinkTo(t, new DataflowLinkOptions() { PropagateCompletion = true });
 046            return t;
 47        }
 48
 49        public static ISourceBlock<T> Flatten<T> (
 50            this ISourceBlock<ISourceBlock<T>> sources,
 51            ExecutionDataflowBlockOptions? options = null
 52        )
 53        {
 154            options ??= new();
 155            if (options.EnsureOrdered)
 56            {
 157                options.MaxDegreeOfParallelism = 1;
 58            }
 59
 160            BufferBlock<T> b = new(options);
 161            var a = sources.Action(async s => {
 204862                using var _ = s.LinkTo(b);
 204863                await s.Completion;
 204864            }, new ExecutionDataflowBlockOptions
 165            {
 166                EnsureOrdered = options.EnsureOrdered,
 167                CancellationToken = options.CancellationToken,
 168                SingleProducerConstrained = true,
 169                BoundedCapacity = options.BoundedCapacity,
 170                MaxDegreeOfParallelism = options.EnsureOrdered ? 1 : options.MaxDegreeOfParallelism
 171            });
 72
 173            _ = a.PropagateCompletion(b);
 74
 175            return b;
 76        }
 77
 78        [Obsolete("Use Transform and Flatten instead")]
 79        public static ISourceBlock<O> TransformMany<I, O>(
 80            this ISourceBlock<I> source,
 81            Func<I, Task<ISourceBlock<O>>> f,
 82            ExecutionDataflowBlockOptions optsForProcessing,
 83            DataflowBlockOptions optsForBuffer
 84        )
 85        {
 086            if (optsForProcessing.EnsureOrdered)
 87            {
 088                optsForProcessing.MaxDegreeOfParallelism = 1;
 89            }
 090            BufferBlock<O> b = new(optsForBuffer);
 091            var a = source
 092                .Action(async msg =>
 093                {
 094                var q = await f(msg);
 095                _ = q.LinkTo(b);
 096                await q.Completion;
 097                },
 098                optsForProcessing
 099            );
 100
 0101            _ = a.PropagateCompletion(b);
 102
 0103            return b;
 104        }
 105
 106        public static TransformManyBlock<I,O> TransformMany<I, O>(
 107            this ISourceBlock<I> source,
 108            Func<I, Task<IEnumerable<O>>> f,
 109            ExecutionDataflowBlockOptions dataflowBlockOptions
 110        )
 111        {
 0112            TransformManyBlock<I, O> res = new(f, dataflowBlockOptions);
 0113            _ = source.LinkTo(res, new DataflowLinkOptions { PropagateCompletion = true });
 0114            return res;
 115        }
 116
 117        public static ISourceBlock<O> TransformMany<I, O>(
 118            this ISourceBlock<I> source,
 119            Func<I, IEnumerable<O>> f,
 120            ExecutionDataflowBlockOptions dataflowBlockOptions
 121        )
 122        {
 0123            TransformManyBlock<I, O> res = new(f, dataflowBlockOptions);
 0124            _ = source.LinkTo(res, new DataflowLinkOptions { PropagateCompletion = true });
 0125            return res;
 126        }
 127
 128        public static TransformManyBlock<I, O> TransformMany<I, O>(
 129            this ISourceBlock<I> source,
 130            Func<I, IAsyncEnumerable<O>> f,
 131            ExecutionDataflowBlockOptions dataflowBlockOptions
 132        )
 133        {
 0134            TransformManyBlock<I, O> res = new(f, dataflowBlockOptions);
 0135            _ = source.LinkTo(res, new DataflowLinkOptions { PropagateCompletion = true });
 0136            return res;
 137        }
 138
 139        public static ISourceBlock<IGrouping<K, V>> GroupAdjacent<T, K, V>(
 140            this ISourceBlock<T> source,
 141            Func<T, K> keySelector,
 142            Func<T, V> valueSelector,
 143            ExecutionDataflowBlockOptions options
 144        )
 145        {
 2146            var res = new GroupAdjacentBlock<T, K, V>(keySelector, valueSelector, options);
 2147            _ = source.LinkTo(res, new DataflowLinkOptions { PropagateCompletion = true });
 2148            return res;
 149        }
 150
 151        public static BatchBlock<T> Batch<T>(this ISourceBlock<T> source, int batchSize) =>
 1152            Batch(source, batchSize, new());
 153
 154        public static BatchBlock<T> Batch<T>(
 155            this ISourceBlock<T> source,
 156            int batchSize,
 157            GroupingDataflowBlockOptions opts
 158        )
 159        {
 1160            var res = new BatchBlock<T>(batchSize, opts);
 1161            _ = source.LinkTo(res, new DataflowLinkOptions { PropagateCompletion = true });
 1162            return res;
 163        }
 164
 165        public static BufferBlock<T> Buffer<T>(this ISourceBlock<T> source) =>
 1166            Buffer(source, new());
 167
 168        public static BufferBlock<T> Buffer<T>(
 169            this ISourceBlock<T> source,
 170            DataflowBlockOptions opts
 171        )
 172        {
 1173            var res = new BufferBlock<T>(opts);
 1174            _ = source.LinkTo(res, new DataflowLinkOptions { PropagateCompletion = true });
 1175            return res;
 176        }
 177
 178        public static ActionBlock<T> Action<T>(this ISourceBlock<T> source, Action<T> action) =>
 0179            Action(source, action, new());
 180
 181        public static ActionBlock<T> Action<T>(
 182            this ISourceBlock<T> source,
 183            Action<T> action,
 184            ExecutionDataflowBlockOptions opts
 185        )
 186        {
 0187            var res = new ActionBlock<T>(action, opts);
 0188            _ = source.LinkTo(res, new DataflowLinkOptions { PropagateCompletion = true });
 0189            return res;
 190        }
 191
 192        public static ActionBlock<T> Action<T>(this ISourceBlock<T> source, Func<T, Task> action) =>
 0193            Action(source, action, new());
 194
 195        public static ActionBlock<T> Action<T>(
 196            this ISourceBlock<T> source,
 197            Func<T, Task> action,
 198            ExecutionDataflowBlockOptions opts
 199        )
 200        {
 1201            var res = new ActionBlock<T>(action, opts);
 1202            _ = source.LinkTo(res, new DataflowLinkOptions { PropagateCompletion = true });
 1203            return res;
 204        }
 205
 206        public static BroadcastBlock<T> BroadcastTo<T>(
 207            this ISourceBlock<T> source,
 208            Func<T, T>? clone,
 209            params ITargetBlock<T>[] targets
 0210        ) => BroadcastTo(source, clone, new(), targets);
 211
 212        public static BroadcastBlock<T> BroadcastTo<T>(
 213            this ISourceBlock<T> source,
 214            Func<T, T>? clone,
 215            DataflowBlockOptions opts,
 216            params ITargetBlock<T>[] targets
 217        )
 218        {
 0219            var res = new BroadcastBlock<T>(clone, opts);
 0220            foreach (var t in targets)
 221            {
 0222                _ = res.LinkTo(t, new DataflowLinkOptions { PropagateCompletion = true });
 223            }
 0224            _ = source.LinkTo(res, new DataflowLinkOptions { PropagateCompletion = true });
 0225            return res;
 226        }
 227
 228        public static GuaranteedBroadcastBlock<T> GuaranteedBroadcastTo<T>(
 229            this ISourceBlock<T> source,
 230            params ITargetBlock<T>[] targets)
 0231        => GuaranteedBroadcastTo(source, new(), targets);
 232
 233        public static GuaranteedBroadcastBlock<T> GuaranteedBroadcastTo<T>(
 234            this ISourceBlock<T> source,
 235            GuaranteedBroadcastBlockOptions opts,
 236            params ITargetBlock<T>[] targets)
 237        {
 0238            var res = new GuaranteedBroadcastBlock<T>(targets.Length, opts);
 0239            for (var i = 0; i < targets.Length; i++)
 240            {
 0241                _ = res[i].LinkTo(targets[i], new DataflowLinkOptions { PropagateCompletion = true });
 242            }
 0243            _ = source.LinkTo(res, new DataflowLinkOptions { PropagateCompletion = true });
 0244            return res;
 245        }
 246
 247        /// <summary>
 248        /// Will eagerly buffer <paramref name="source"/> items from the async enumerable.
 249        /// </summary>
 250        public static IAsyncEnumerable<T> WithBuffer<T>(this IAsyncEnumerable<T> source, int bufferSize = DataflowBlockO
 0251        => source
 0252                .AsSourceBlock(options: new() { BoundedCapacity = bufferSize, CancellationToken = cancellationToken })
 0253                .AsAsyncEnumerable(cancellationToken: cancellationToken);
 254
 255        public static async IAsyncEnumerable<T> Finally<T>(
 256            this IAsyncEnumerable<T> source,
 257            Action finallyAction)
 258        {
 259            try
 260            {
 131971261                await foreach (var item in source)
 262                {
 65972263                    yield return item;
 264                }
 13265            }
 266            finally
 267            {
 14268                finallyAction();
 269            }
 13270        }
 271
 272        /// <summary>
 273        /// Asynchronously collects all elements from an <see cref="ISourceBlock{T}"/> into a list.
 274        /// </summary>
 275        /// <typeparam name="T">The type of elements produced by the source block.</typeparam>
 276        /// <param name="source">The source block to read elements from.</param>
 277        /// <param name="cancellationToken">A <see cref="CancellationToken"/> to observe while waiting for completion.</
 278        /// <returns>A task that represents the asynchronous operation. The task result contains a list of all elements 
 279        public static async Task<IList<T>> ToListAsync<T>(this ISourceBlock<T> source, CancellationToken cancellationTok
 280        {
 281            {
 1282                var b = new BufferBlock<T>(new() { CancellationToken = cancellationToken });
 1283                source.LinkTo(b, new DataflowLinkOptions { PropagateCompletion = true });
 1284                await source.Completion;
 1285                var succ = b.TryReceiveAll(out var items);
 286
 287                // The buffer should complete; propagate any cancellation exceptions
 1288                await b.Completion;
 289
 290                Debug.Assert(succ);
 1291                return items!;
 292            }
 1293        }
 294
 295        public static IAsyncEnumerable<T> AsAsyncEnumerable<T>(
 296            this ISourceBlock<T> source,
 297            bool throwFirstExceptionOnly = true,
 298            bool completeSource = true,
 299            CancellationToken cancellationToken = default
 300        )
 301        {
 14302            var ret = source is IReceivableSourceBlock<T> r ? Fast(r) : Slow();
 303
 14304            if (completeSource)
 305            {
 14306                ret = ret.Finally(() =>
 14307                {
 14308                    if (!source.Completion.IsCompleted)
 14309                    {
 1310                        source.Fault(new OperationCanceledException("Evaluation stopped prematurely"));
 14311                    }
 28312                });
 313            }
 314
 14315            return ret;
 316
 317            async Task<TResult> UnwrapAggregateAsync<TResult>(Func<Task<TResult>> fn)
 318            {
 319                try
 320                {
 237321                    return await fn();
 322                }
 1323                catch (AggregateException ex) when (throwFirstExceptionOnly)
 324                {
 1325                    var inner = ex.Flatten().InnerExceptions.First();
 1326                    ExceptionDispatchInfo.Capture(inner).Throw();
 0327                    throw; // unreachable
 328                }
 235329            }
 330
 331            Task<bool> OutputAvailableAsync() => UnwrapAggregateAsync(() => source.OutputAvailableAsync(cancellationToke
 332            Task<T> ReceiveAsync() => UnwrapAggregateAsync(() => source.ReceiveAsync(cancellationToken));
 333            Task<int> CompletionAsync() => UnwrapAggregateAsync(async () => {
 334                await source.Completion;
 335                return 0;
 336            });
 337
 338            async IAsyncEnumerable<T> Fast(IReceivableSourceBlock<T> source)
 339            {
 213340                while (await OutputAvailableAsync())
 341                {
 201342                    var fastPathFails = true;
 409343                    while (source.TryReceiveAll(out var l))
 344                    {
 209345                        fastPathFails = false;
 132341346                        foreach (var t in l)
 347                        {
 65962348                            yield return t;
 349                        }
 208350                    }
 351
 200352                    if (fastPathFails)
 353                    {
 354                        //Fall back to slow path
 22355                        await foreach (var i in Slow())
 356                        {
 10357                            yield return i;
 358                        }
 1359                        yield break;
 360                    }
 361                }
 12362                await CompletionAsync();
 363            }
 364
 365            async IAsyncEnumerable<T> Slow()
 366            {
 367
 368                while (true)
 369                {
 370                    T t;
 371                    try
 372                    {
 11373                        t = await ReceiveAsync();
 11374                    } catch (Exception) when (source.Completion.IsCompleted)
 375                    {
 1376                        break;
 377                    }
 10378                    yield return t;
 379                }
 1380                await CompletionAsync();
 1381            }
 13382        }
 383    }
 384}

Methods/Properties

Filter(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>,System.Predicate`1<T>,CounterpointCollective.Dataflow.FilterMode)
Transform(System.Threading.Tasks.Dataflow.ISourceBlock`1<I>,System.Func`2<I,O>)
Transform(System.Threading.Tasks.Dataflow.ISourceBlock`1<I>,System.Func`2<I,System.Threading.Tasks.Task`1<O>>)
Transform(System.Threading.Tasks.Dataflow.ISourceBlock`1<I>,System.Func`2<I,O>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
Transform(System.Threading.Tasks.Dataflow.ISourceBlock`1<I>,System.Func`2<I,System.Threading.Tasks.Task`1<O>>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
Flatten(System.Threading.Tasks.Dataflow.ISourceBlock`1<System.Threading.Tasks.Dataflow.ISourceBlock`1<T>>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
TransformMany(System.Threading.Tasks.Dataflow.ISourceBlock`1<I>,System.Func`2<I,System.Threading.Tasks.Task`1<System.Threading.Tasks.Dataflow.ISourceBlock`1<O>>>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions,System.Threading.Tasks.Dataflow.DataflowBlockOptions)
TransformMany(System.Threading.Tasks.Dataflow.ISourceBlock`1<I>,System.Func`2<I,System.Threading.Tasks.Task`1<System.Collections.Generic.IEnumerable`1<O>>>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
TransformMany(System.Threading.Tasks.Dataflow.ISourceBlock`1<I>,System.Func`2<I,System.Collections.Generic.IEnumerable`1<O>>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
TransformMany(System.Threading.Tasks.Dataflow.ISourceBlock`1<I>,System.Func`2<I,System.Collections.Generic.IAsyncEnumerable`1<O>>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
GroupAdjacent(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>,System.Func`2<T,K>,System.Func`2<T,V>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
Batch(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>,System.Int32)
Batch(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>,System.Int32,System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions)
Buffer(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>)
Buffer(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>,System.Threading.Tasks.Dataflow.DataflowBlockOptions)
Action(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>,System.Action`1<T>)
Action(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>,System.Action`1<T>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
Action(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>,System.Func`2<T,System.Threading.Tasks.Task>)
Action(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>,System.Func`2<T,System.Threading.Tasks.Task>,System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions)
BroadcastTo(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>,System.Func`2<T,T>,System.Threading.Tasks.Dataflow.ITargetBlock`1<T>[])
BroadcastTo(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>,System.Func`2<T,T>,System.Threading.Tasks.Dataflow.DataflowBlockOptions,System.Threading.Tasks.Dataflow.ITargetBlock`1<T>[])
GuaranteedBroadcastTo(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>,System.Threading.Tasks.Dataflow.ITargetBlock`1<T>[])
GuaranteedBroadcastTo(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>,CounterpointCollective.Dataflow.GuaranteedBroadcastBlockOptions,System.Threading.Tasks.Dataflow.ITargetBlock`1<T>[])
WithBuffer(System.Collections.Generic.IAsyncEnumerable`1<T>,System.Int32,System.Threading.CancellationToken)
Finally()
ToListAsync()
AsAsyncEnumerable(System.Threading.Tasks.Dataflow.ISourceBlock`1<T>,System.Boolean,System.Boolean,System.Threading.CancellationToken)
UnwrapAggregateAsync()
Fast()
Slow()