| | | 1 | | using CounterpointCollective.Collections; |
| | | 2 | | using CounterpointCollective.Dataflow.Encapsulation; |
| | | 3 | | using CounterpointCollective.Dataflow.Fluent; |
| | | 4 | | using CounterpointCollective.Dataflow.Internal; |
| | | 5 | | using CounterpointCollective.Dataflow.Notifying; |
| | | 6 | | using System.Threading.Tasks.Dataflow; |
| | | 7 | | |
| | | 8 | | namespace CounterpointCollective.Dataflow |
| | | 9 | | { |
| | | 10 | | /// <summary> |
| | | 11 | | /// To prevent deadlocks, Source will always accept messages. |
| | | 12 | | /// However, the block itself will only accept new messages if the combined count of |
| | | 13 | | /// Source and the joining side (using either the smallest or largest queue count, |
| | | 14 | | /// depending on the configured mode) is below the capacity limit. |
| | | 15 | | /// |
| | | 16 | | /// Be careful when you pass GuaranteedBroadcastBlockBoundedCapacityMode.LargestQueue in the options. |
| | | 17 | | /// This may trigger a deadlock in some cases, namely: |
| | | 18 | | /// - Queue[1] is empty. |
| | | 19 | | /// - Queue[2] has items |
| | | 20 | | /// - The ParBlock is full; it will not enqueue more messages before some are consumed. |
| | | 21 | | /// - worker 1 requires to consume messages before it will produce a next message, e.g. because it is batching. |
| | | 22 | | /// - worker 2 cannot consume more messages at the moment because it's output side is full |
| | | 23 | | /// The JoiningTarget will not consume messages until all workers have output available, causing a deadlock. |
| | | 24 | | /// </summary> |
| | | 25 | | public class ParallelBlock<I, T, O> : AbstractEncapsulatedPropagatorBlock<I, O> |
| | | 26 | | { |
| | | 27 | | /// <exclude/> |
| | 31 | 28 | | protected override ITargetBlock<I> TargetSide => _boundedPropagatorBlock; |
| | | 29 | | /// <exclude/> |
| | 94 | 30 | | protected override ISourceBlock<O> SourceSide => _boundedPropagatorBlock; |
| | | 31 | | |
| | | 32 | | private readonly GuaranteedBroadcastBlock<I> _fanOutBlock; |
| | | 33 | | private readonly ITargetBlock<T> _fanInBlock; |
| | | 34 | | |
| | 43 | 35 | | private int DegreeOfParallelism { get; } |
| | | 36 | | |
| | 11 | 37 | | public int InputCount => _fanOutBlock.Count; |
| | | 38 | | |
| | 3 | 39 | | public int OutputCount => _boundedPropagatorBlock.Count - InputCount; |
| | | 40 | | |
| | | 41 | | /// <summary> |
| | | 42 | | /// Items currently in the block, being either in the input side or output side. |
| | | 43 | | /// When they leave for the workers, they are no longer counted here. |
| | | 44 | | /// When they return from the workers, they are counted here again. |
| | | 45 | | /// This can cause the Count to be temporarily higher than the BoundedCapacity, just like |
| | | 46 | | /// in the TransformManyBlock. |
| | | 47 | | /// </summary> |
| | 3 | 48 | | public int Count => _boundedPropagatorBlock.Count; |
| | | 49 | | |
| | | 50 | | private readonly BoundedPropagatorBlock<I,O> _boundedPropagatorBlock; |
| | | 51 | | |
| | 6 | 52 | | private readonly List<IPropagatorBlock<I, T>> _workers = []; |
| | | 53 | | private int i; //Index of next worker to hookup |
| | | 54 | | |
| | 6 | 55 | | public ParallelBlock( |
| | 6 | 56 | | int degreeOfParallelism, |
| | 6 | 57 | | Func<T[], O> recombine, |
| | 6 | 58 | | GuaranteedBroadcastBlockOptions options |
| | 6 | 59 | | ) |
| | | 60 | | { |
| | 6 | 61 | | if (degreeOfParallelism < 2) |
| | | 62 | | { |
| | 0 | 63 | | throw new ArgumentException("degreeOfParallelism must be at least 2.", nameof(degreeOfParallelism)); |
| | | 64 | | } |
| | 6 | 65 | | DegreeOfParallelism = degreeOfParallelism; |
| | | 66 | | |
| | 6 | 67 | | _fanOutBlock = new GuaranteedBroadcastBlock<I>( |
| | 6 | 68 | | degreeOfParallelism, |
| | 6 | 69 | | new() |
| | 6 | 70 | | { |
| | 6 | 71 | | CancellationToken = options.CancellationToken, |
| | 6 | 72 | | BoundedCapacityMode = options.BoundedCapacityMode, |
| | 6 | 73 | | }, |
| | 28 | 74 | | onQueuesShrunk : diff => MessagesFannedOut(-diff) |
| | 6 | 75 | | ); |
| | | 76 | | |
| | 6 | 77 | | var j = new BatchBlock<T>(degreeOfParallelism, new() |
| | 6 | 78 | | { |
| | 6 | 79 | | CancellationToken = options.CancellationToken, |
| | 6 | 80 | | Greedy = false |
| | 6 | 81 | | }) |
| | 6 | 82 | | .Pipeline() |
| | 6 | 83 | | .Transform( |
| | 28 | 84 | | b => recombine([.. b]), |
| | 6 | 85 | | new ExecutionDataflowBlockOptions |
| | 6 | 86 | | { |
| | 6 | 87 | | SingleProducerConstrained = true, |
| | 6 | 88 | | CancellationToken = options.CancellationToken |
| | 6 | 89 | | } |
| | 6 | 90 | | ) |
| | 6 | 91 | | .Build(); |
| | 6 | 92 | | _fanInBlock = j; |
| | | 93 | | |
| | 6 | 94 | | _boundedPropagatorBlock = new BoundedPropagatorBlock<I, O>(_fanOutBlock, j, options.BoundedCapacity); |
| | 6 | 95 | | } |
| | | 96 | | |
| | 28 | 97 | | private void MessagesFannedOut(int diff) => _boundedPropagatorBlock.AdjustCount(-diff); |
| | | 98 | | |
| | 28 | 99 | | private void MessagesFannedIn(int diff) => _boundedPropagatorBlock.AdjustCount(diff); |
| | | 100 | | |
| | | 101 | | public void Hookup(IPropagatorBlock<I,T> worker, DataflowLinkOptions options) |
| | | 102 | | { |
| | 20 | 103 | | if (i == DegreeOfParallelism) |
| | | 104 | | { |
| | 0 | 105 | | throw new InvalidOperationException($"All {DegreeOfParallelism} workers have already been hooked up."); |
| | | 106 | | } |
| | 20 | 107 | | _fanOutBlock[i].LinkTo(worker, options); |
| | | 108 | | |
| | 20 | 109 | | worker |
| | 20 | 110 | | = i == 0 |
| | 20 | 111 | | ? worker |
| | 20 | 112 | | .Pipeline() |
| | 20 | 113 | | .WithNotification(h => |
| | 20 | 114 | | { |
| | 34 | 115 | | h.OnDeliveringMessages = count => MessagesFannedIn(count); |
| | 12 | 116 | | h.ConfigureDispatching = q => q.UseSynchronousDispatch(); |
| | 6 | 117 | | }) |
| | 20 | 118 | | .Build() |
| | 20 | 119 | | : worker; |
| | | 120 | | |
| | 20 | 121 | | _workers.Add(worker); |
| | | 122 | | |
| | 20 | 123 | | worker.LinkTo(_fanInBlock); |
| | 20 | 124 | | worker.PropagateCompletion(TaskContinuationOptions.OnlyOnFaulted, _fanInBlock); |
| | 20 | 125 | | i++; |
| | 20 | 126 | | if (i == DegreeOfParallelism) |
| | | 127 | | { |
| | 26 | 128 | | Task.WhenAll(_workers.Select(e => e.Completion)) |
| | 6 | 129 | | .PropagateCompletion(_fanInBlock); |
| | | 130 | | } |
| | 20 | 131 | | } |
| | | 132 | | |
| | | 133 | | /// <summary> |
| | | 134 | | /// Convenience method to link all the workers in one go. |
| | | 135 | | /// </summary> |
| | | 136 | | public void LinkWorkers(IPropagatorBlock<I, T>[] workers, DataflowLinkOptions options) |
| | | 137 | | { |
| | 3 | 138 | | if (workers.Length != DegreeOfParallelism) |
| | | 139 | | { |
| | 0 | 140 | | throw new ArgumentException($"Exactly {DegreeOfParallelism} workers are required.", nameof(workers)); |
| | | 141 | | } |
| | | 142 | | |
| | 34 | 143 | | for (var i = 0; i < workers.Length; i++) |
| | | 144 | | { |
| | 14 | 145 | | Hookup(workers[i], options); |
| | | 146 | | } |
| | 3 | 147 | | } |
| | | 148 | | } |
| | | 149 | | } |