| | | 1 | | using CounterpointCollective.Collections; |
| | | 2 | | using CounterpointCollective.Dataflow.Encapsulation; |
| | | 3 | | using CounterpointCollective.Dataflow.Fluent; |
| | | 4 | | using CounterpointCollective.Dataflow.Internal; |
| | | 5 | | using System.Threading.Tasks.Dataflow; |
| | | 6 | | |
| | | 7 | | namespace CounterpointCollective.Dataflow |
| | | 8 | | { |
| | | 9 | | public class NonOrderPreservingChoiceBlock<TInput, TOutput> : AbstractEncapsulatedPropagatorBlock<TInput, TOutput>, |
| | | 10 | | { |
| | 100402 | 11 | | protected override ITargetBlock<TInput> TargetSide => _boundedPropagatorBlock; |
| | | 12 | | |
| | 201738 | 13 | | protected override ISourceBlock<TOutput> SourceSide => _boundedPropagatorBlock; |
| | | 14 | | |
| | | 15 | | private readonly BoundedPropagatorBlock<TInput, TOutput> _boundedPropagatorBlock; |
| | | 16 | | private readonly BufferBlock<TOutput> _outputBuffer; |
| | | 17 | | |
| | 3 | 18 | | public int Count => _boundedPropagatorBlock.Count; |
| | | 19 | | |
| | 2 | 20 | | public int InputCount => Count - OutputCount; |
| | | 21 | | |
| | 5 | 22 | | public int OutputCount => _outputBuffer.Count; |
| | | 23 | | |
| | | 24 | | public int BoundedCapacity |
| | | 25 | | { |
| | 0 | 26 | | get => _boundedPropagatorBlock.BoundedCapacity; |
| | 0 | 27 | | set => _boundedPropagatorBlock.BoundedCapacity = value; |
| | | 28 | | } |
| | | 29 | | |
| | | 30 | | public NonOrderPreservingChoiceBlock( |
| | | 31 | | Predicate<TInput> predicate, |
| | | 32 | | IPropagatorBlock<TInput, TOutput> thenBlock, |
| | | 33 | | IPropagatorBlock<TInput, TOutput> elseBlock, |
| | | 34 | | ExecutionDataflowBlockOptions? options = null |
| | 1 | 35 | | ) : this( |
| | 100000 | 36 | | new TransformBlock<TInput, (bool, TInput)>(i => (predicate(i), i), |
| | 1 | 37 | | new() { |
| | 1 | 38 | | CancellationToken = options?.CancellationToken ?? CancellationToken.None, |
| | 1 | 39 | | SingleProducerConstrained = options?.SingleProducerConstrained ?? false |
| | 1 | 40 | | }), |
| | 1 | 41 | | thenBlock, |
| | 1 | 42 | | elseBlock, |
| | 1 | 43 | | options |
| | 1 | 44 | | ) |
| | | 45 | | { |
| | 1 | 46 | | } |
| | | 47 | | |
| | 13 | 48 | | public NonOrderPreservingChoiceBlock( |
| | 13 | 49 | | IPropagatorBlock<TInput, (bool P, TInput V)> conditionCheckBlock, |
| | 13 | 50 | | IPropagatorBlock<TInput, TOutput> thenBlock, |
| | 13 | 51 | | IPropagatorBlock<TInput, TOutput> elseBlock, |
| | 13 | 52 | | DataflowBlockOptions? options = null |
| | 13 | 53 | | ) |
| | | 54 | | { |
| | 13 | 55 | | options ??= new(); |
| | | 56 | | |
| | 13 | 57 | | _outputBuffer = new BufferBlock<TOutput>(new() { CancellationToken = options.CancellationToken }); |
| | | 58 | | |
| | 13 | 59 | | _boundedPropagatorBlock = new BoundedPropagatorBlock<TInput, TOutput>( |
| | 13 | 60 | | DataflowBlock.Encapsulate(conditionCheckBlock, _outputBuffer), |
| | 13 | 61 | | options.BoundedCapacity |
| | 13 | 62 | | ); |
| | | 63 | | |
| | 13 | 64 | | conditionCheckBlock = |
| | 13 | 65 | | conditionCheckBlock |
| | 13 | 66 | | .Pipeline() |
| | 13 | 67 | | .SelectLast(e => _boundedPropagatorBlock.CreateExit(e)) |
| | 13 | 68 | | .Build(); |
| | | 69 | | |
| | 13 | 70 | | conditionCheckBlock.Pipeline() |
| | 100361 | 71 | | .SynchronousFilter(t => t.P) |
| | 50209 | 72 | | .SynchronousTransform(t => t.V) |
| | 13 | 73 | | .LinkTo(thenBlock, new() { PropagateCompletion = true }); |
| | | 74 | | |
| | 13 | 75 | | conditionCheckBlock.Pipeline() |
| | 52270 | 76 | | .SynchronousFilter(t => !t.P) |
| | 50102 | 77 | | .SynchronousTransform(t => t.V) |
| | 13 | 78 | | .LinkTo(elseBlock, new() { PropagateCompletion = true }); |
| | | 79 | | |
| | 13 | 80 | | thenBlock |
| | 13 | 81 | | .Pipeline() |
| | 13 | 82 | | .WithNotification(h => { |
| | 50222 | 83 | | h.OnDeliveringMessages = count => _boundedPropagatorBlock.AdjustCount(count); |
| | 26 | 84 | | h.ConfigureDispatching = q => q.UseSynchronousDispatch(); |
| | 13 | 85 | | }) |
| | 13 | 86 | | .LinkTo(_outputBuffer, new()); |
| | | 87 | | |
| | 13 | 88 | | elseBlock |
| | 13 | 89 | | .Pipeline() |
| | 13 | 90 | | .WithNotification(h => { |
| | 50164 | 91 | | h.OnDeliveringMessages = count => _boundedPropagatorBlock.AdjustCount(count); |
| | 26 | 92 | | h.ConfigureDispatching = q => q.UseSynchronousDispatch(); |
| | 13 | 93 | | }) |
| | 13 | 94 | | .LinkTo(_outputBuffer, new()); |
| | | 95 | | |
| | | 96 | | |
| | 13 | 97 | | SetupCompletion(conditionCheckBlock.Completion, thenBlock, options.CancellationToken); |
| | 13 | 98 | | SetupCompletion(conditionCheckBlock.Completion, elseBlock, options.CancellationToken); |
| | | 99 | | |
| | 13 | 100 | | Task.Run(async () => |
| | 13 | 101 | | { |
| | 13 | 102 | | var t = Task.WhenAll( |
| | 13 | 103 | | conditionCheckBlock.Completion, |
| | 13 | 104 | | thenBlock.Completion, |
| | 13 | 105 | | elseBlock.Completion |
| | 13 | 106 | | ); |
| | 13 | 107 | | await Task.WhenAny(t); |
| | 12 | 108 | | if (!options.CancellationToken.IsCancellationRequested) |
| | 13 | 109 | | { |
| | 11 | 110 | | _ = t.PropagateCompletion(_outputBuffer); |
| | 13 | 111 | | } |
| | 25 | 112 | | }); |
| | 13 | 113 | | } |
| | | 114 | | |
| | | 115 | | private void SetupCompletion(Task completionAllowed, IPropagatorBlock<TInput, TOutput> myBranch, CancellationTok |
| | | 116 | | { |
| | 26 | 117 | | completionAllowed.PropagateCompletion(myBranch); |
| | 26 | 118 | | myBranch.Completion.ContinueWith(t => |
| | 26 | 119 | | { |
| | 22 | 120 | | if (cancellationToken.IsCancellationRequested) |
| | 26 | 121 | | { |
| | 0 | 122 | | return; |
| | 26 | 123 | | } |
| | 22 | 124 | | else if (t.IsFaulted) |
| | 26 | 125 | | { |
| | 6 | 126 | | Fault(t.Exception); |
| | 26 | 127 | | } |
| | 16 | 128 | | else if (!completionAllowed.IsCompleted) |
| | 26 | 129 | | { |
| | 2 | 130 | | Fault(new InvalidOperationException("Branch completed prematurely")); |
| | 26 | 131 | | } |
| | 42 | 132 | | }, cancellationToken); |
| | 26 | 133 | | } |
| | | 134 | | } |
| | | 135 | | } |