| | | 1 | | using CounterpointCollective.Dataflow.Fluent; |
| | | 2 | | using CounterpointCollective.Dataflow.Notifying; |
| | | 3 | | using System.Threading.Tasks.Dataflow; |
| | | 4 | | |
| | | 5 | | namespace CounterpointCollective.Dataflow.Fluent |
| | | 6 | | { |
| | | 7 | | public static class PipelineBuilderExtensions |
| | | 8 | | { |
| | | 9 | | /// <summary> |
| | | 10 | | /// Links an choice block to the end of the current IPipelineBuilder. |
| | | 11 | | /// </summary> |
| | | 12 | | public static IPipelineBuilder<TFirst, ChoiceBlock<T, O>> Choice<TFirst, T, O>( |
| | | 13 | | this IPipelineBuilder<TFirst, ISourceBlock<T>> p, |
| | | 14 | | Predicate<T> predicate, |
| | | 15 | | IPropagatorBlock<T, O> thenBlock, |
| | | 16 | | IPropagatorBlock<T, O> elseBlock, |
| | | 17 | | ExecutionDataflowBlockOptions? options = null |
| | | 18 | | ) where TFirst : IDataflowBlock |
| | | 19 | | { |
| | 1 | 20 | | var choiceBlock = |
| | 1 | 21 | | new ChoiceBlock<T, O>(predicate, thenBlock, elseBlock, options ?? new()); |
| | 1 | 22 | | return p.LinkTo(choiceBlock, new DataflowLinkOptions() { PropagateCompletion = true }); |
| | | 23 | | } |
| | | 24 | | |
| | | 25 | | /// <summary> |
| | | 26 | | /// Links an GroupAdjacentBlock to the end of the current IPipelineBuilder. |
| | | 27 | | /// </summary> |
| | | 28 | | public static IPipelineBuilder<TFirst, GroupAdjacentBlock<T, TKey, T>> GroupAdjacent<TFirst, T, TKey>( |
| | | 29 | | this IPipelineBuilder<TFirst, ISourceBlock<T>> p, |
| | | 30 | | Func<T, TKey> keySelector, |
| | | 31 | | ExecutionDataflowBlockOptions options, |
| | | 32 | | bool flushOnIdle = false |
| | | 33 | | ) where TFirst : IDataflowBlock |
| | 0 | 34 | | => GroupAdjacent(p, keySelector, e => e, options, flushOnIdle); |
| | | 35 | | |
| | | 36 | | /// <summary> |
| | | 37 | | /// Links an GroupAdjacentBlock to the end of the current IPipelineBuilder. |
| | | 38 | | /// </summary> |
| | | 39 | | public static IPipelineBuilder<TFirst, GroupAdjacentBlock<T, TKey, TValue>> GroupAdjacent<TFirst, T, TKey, TValu |
| | | 40 | | this IPipelineBuilder<TFirst, ISourceBlock<T>> p, |
| | | 41 | | Func<T, TKey> keySelector, |
| | | 42 | | Func<T, TValue> valueSelector, |
| | | 43 | | ExecutionDataflowBlockOptions options, |
| | | 44 | | bool flushOnIdle = false |
| | | 45 | | ) where TFirst : IDataflowBlock |
| | 0 | 46 | | => p.LinkTo(new GroupAdjacentBlock<T, TKey, TValue>(keySelector, valueSelector, options, flushOnIdle), new Dataf |
| | | 47 | | |
| | | 48 | | /// <summary> |
| | | 49 | | /// Links an TeeBlock to the end of the current IPipelineBuilder. |
| | | 50 | | /// </summary> |
| | | 51 | | public static IPipelineBuilder<TFirst, ISourceBlock<O>> Tee<TFirst, I, T, O>( |
| | | 52 | | this IPipelineBuilder<TFirst, ISourceBlock<I>> p, |
| | | 53 | | IPropagatorBlock<I, T> inner, |
| | | 54 | | Func<I, T, O> combinator, |
| | | 55 | | DataflowBlockOptions? options = null |
| | | 56 | | ) where TFirst : IDataflowBlock |
| | | 57 | | { |
| | 1 | 58 | | var t = new TeeBlock<I, T, O>( |
| | 1 | 59 | | inner, |
| | 1 | 60 | | combinator, |
| | 1 | 61 | | options ?? new DataflowBlockOptions() |
| | 1 | 62 | | ); |
| | 1 | 63 | | return p.LinkTo(t, new() { PropagateCompletion = true }); |
| | | 64 | | } |
| | | 65 | | |
| | | 66 | | /// <summary> |
| | | 67 | | /// Links a ParallelBlock to the end of the current IPipelineBuilder. |
| | | 68 | | /// </summary> |
| | | 69 | | public static IPipelineBuilder<TFirst, ISourceBlock<TOutput>> Parallel<TFirst, TInput, T1, T2, TOutput>( |
| | | 70 | | this IPipelineBuilder<TFirst, ISourceBlock<TInput>> p, |
| | | 71 | | IPropagatorBlock<TInput, T1> block1, |
| | | 72 | | IPropagatorBlock<TInput, T2> block2, |
| | | 73 | | Func<T1, T2, TOutput> f, |
| | | 74 | | GuaranteedBroadcastBlockOptions? options = null |
| | | 75 | | ) where TFirst : IDataflowBlock |
| | | 76 | | { |
| | 0 | 77 | | var inner = block1.Par(block2, f, options ?? new()); |
| | 0 | 78 | | return p.LinkTo(inner, new() { PropagateCompletion = true }); |
| | | 79 | | } |
| | | 80 | | |
| | | 81 | | /// <summary> |
| | | 82 | | /// Links a ParallelBlock to the end of the current IPipelineBuilder. |
| | | 83 | | /// </summary> |
| | | 84 | | public static IPipelineBuilder<TFirst, ISourceBlock<TOutput>> Parallel<TFirst, TOutput>( |
| | | 85 | | this IPipelineBuilder<TFirst, ISourceBlock<TOutput>> p, |
| | | 86 | | IPropagatorBlock<TOutput, TOutput>[] blocks, |
| | | 87 | | GuaranteedBroadcastBlockOptions? options = null |
| | | 88 | | ) where TFirst : IDataflowBlock |
| | | 89 | | { |
| | 0 | 90 | | var inner = blocks.Par(options ?? new()); |
| | 0 | 91 | | return p.LinkTo(inner, new() { PropagateCompletion = true }); |
| | | 92 | | } |
| | | 93 | | |
| | | 94 | | /// <summary> |
| | | 95 | | /// Links a ResizableBuffer to the end of the current IPipelineBuilder. |
| | | 96 | | /// </summary> |
| | | 97 | | public static IPipelineBuilder<TFirst, ResizableBufferBlock<TOutput>> ResizableBuffer<TFirst, TOutput>( |
| | | 98 | | this IPipelineBuilder<TFirst, ISourceBlock<TOutput>> p, |
| | | 99 | | DataflowBlockOptions? options = null, |
| | | 100 | | Action? onEntered = null |
| | | 101 | | ) where TFirst : IDataflowBlock |
| | 0 | 102 | | => p.LinkTo(new ResizableBufferBlock<TOutput>(options ?? new(), onEntered), new() { PropagateCompletion = true } |
| | | 103 | | |
| | | 104 | | /// <summary> |
| | | 105 | | /// Wraps the last block of the current IPipelineBuilder in a SynchronousFilterBlock. |
| | | 106 | | /// </summary> |
| | | 107 | | public static IPipelineBuilder<TFirst, SynchronousFilterBlock<T>> SynchronousFilter<TFirst, T>( |
| | | 108 | | this IPipelineBuilder<TFirst, ISourceBlock<T>> p, |
| | | 109 | | Predicate<T> predicate, |
| | | 110 | | FilterMode mode = FilterMode.Block |
| | | 111 | | ) where TFirst : IDataflowBlock |
| | 52 | 112 | | => p.SelectLast(e => new SynchronousFilterBlock<T>(e, predicate, mode)); |
| | | 113 | | |
| | | 114 | | /// <summary> |
| | | 115 | | /// Wraps the last block of the current IPipelineBuilder in a SynchronousTransformingBlock. |
| | | 116 | | /// </summary> |
| | | 117 | | public static IPipelineBuilder<TFirst, SynchronousTransformingBlock<T, O>> SynchronousTransform<TFirst, T, O>( |
| | | 118 | | this IPipelineBuilder<TFirst, ISourceBlock<T>> p, |
| | | 119 | | Func<T, O> transform |
| | | 120 | | ) where TFirst : IDataflowBlock |
| | 56 | 121 | | => p.SelectLast(e => new SynchronousTransformingBlock<T, O>(e, transform)); |
| | | 122 | | |
| | | 123 | | /// <summary> |
| | | 124 | | /// Wraps the last block of the current IPipelineBuilder in a SourceBlockWithDeliveryNotification. |
| | | 125 | | /// </summary> |
| | | 126 | | public static IPipelineBuilder<TFirst, NotifyingSourceBlock<T>> WithNotification<TFirst, T>( |
| | | 127 | | this IPipelineBuilder<TFirst, ISourceBlock<T>> p, |
| | | 128 | | ConfigureHooks<T> hooks |
| | | 129 | | ) where TFirst : IDataflowBlock |
| | 112 | 130 | | => p.SelectLast(l => l.WithNotification(hooks)); |
| | | 131 | | } |
| | | 132 | | } |