| | | 1 | | using CounterpointCollective.Collections; |
| | | 2 | | using CounterpointCollective.Dataflow.Encapsulation; |
| | | 3 | | using CounterpointCollective.Dataflow.Fluent; |
| | | 4 | | using CounterpointCollective.Dataflow.Internal; |
| | | 5 | | using CounterpointCollective.Dataflow.Notifying; |
| | | 6 | | using System; |
| | | 7 | | using System.Collections.Generic; |
| | | 8 | | using System.Linq; |
| | | 9 | | using System.Threading.Tasks; |
| | | 10 | | using System.Threading.Tasks.Dataflow; |
| | | 11 | | |
| | | 12 | | namespace CounterpointCollective.Dataflow |
| | | 13 | | { |
| | | 14 | | public enum GuaranteedBroadcastBlockBoundedCapacityMode |
| | | 15 | | { |
| | | 16 | | SmallestQueue, |
| | | 17 | | LargestQueue |
| | | 18 | | } |
| | | 19 | | |
| | | 20 | | public class GuaranteedBroadcastBlockOptions : DataflowBlockOptions |
| | | 21 | | { |
| | | 22 | | /// <summary> |
| | | 23 | | /// Whether to use the smallest or largest queue size for bounded capacity. |
| | | 24 | | /// Defaults to <see cref="GuaranteedBroadcastBlockBoundedCapacityMode.SmallestQueue"/>, |
| | | 25 | | /// meaning the fastest consuming block will determine the Count. |
| | | 26 | | /// |
| | | 27 | | /// If you set it to <see cref="GuaranteedBroadcastBlockBoundedCapacityMode.LargestQueue"/>, |
| | | 28 | | /// the slowest consuming block will determine when the Count. |
| | | 29 | | /// </summary> |
| | 29 | 30 | | public GuaranteedBroadcastBlockBoundedCapacityMode BoundedCapacityMode { get; set; } = GuaranteedBroadcastBlockB |
| | | 31 | | } |
| | | 32 | | |
| | | 33 | | /// <summary> |
| | | 34 | | /// Guaranteed delivery of messages to NrOfSources sources. |
| | | 35 | | /// Queueing up messages in the sources until the smallest queue reached BoundedCapacity. |
| | | 36 | | /// </summary> |
| | | 37 | | |
| | | 38 | | public sealed class GuaranteedBroadcastBlock<T> : AbstractEncapsulatedTargetBlock<T> |
| | | 39 | | { |
| | | 40 | | private object Lock { get; } = new(); |
| | | 41 | | private readonly UniquePriorityQueue<int, long, long> _totalMessagesDeliveredPerQueue; |
| | | 42 | | private readonly NotifyingPropagatorBlock<T, T>[] _queues; |
| | | 43 | | public IReceivableSourceBlock<T> this[int index] => _queues[index]; |
| | | 44 | | |
| | | 45 | | /// <summary> |
| | | 46 | | /// Size of the smallest/largest queue, depending on BoundedCapacityMode. |
| | | 47 | | /// </summary> |
| | | 48 | | public int Count |
| | | 49 | | { |
| | | 50 | | get |
| | | 51 | | { |
| | | 52 | | foreach (var q in _queues) |
| | | 53 | | { |
| | | 54 | | q.ProcessPendingEvents(); |
| | | 55 | | } |
| | | 56 | | return _bounded.Count; |
| | | 57 | | } |
| | | 58 | | } |
| | | 59 | | private readonly Action<int>? _onQueuesShrunk; |
| | | 60 | | private readonly BoundedTargetBlock<T> _bounded; |
| | | 61 | | |
| | | 62 | | /// <exclude/> |
| | | 63 | | public override Task Completion { get; } |
| | | 64 | | |
| | | 65 | | /// <exclude/> |
| | | 66 | | protected override ITargetBlock<T> TargetSide => _bounded; |
| | | 67 | | |
| | | 68 | | /// <exclude/> |
| | | 69 | | protected override IDataflowBlock CompletionSide => this; |
| | | 70 | | |
| | | 71 | | public GuaranteedBroadcastBlock( |
| | | 72 | | int nrOfSources, |
| | | 73 | | GuaranteedBroadcastBlockOptions options, |
| | | 74 | | Action<int>? onQueuesShrunk = null |
| | | 75 | | ) |
| | | 76 | | { |
| | | 77 | | _onQueuesShrunk = onQueuesShrunk; |
| | | 78 | | |
| | | 79 | | _queues = new NotifyingPropagatorBlock<T,T>[nrOfSources]; |
| | | 80 | | if (options.BoundedCapacityMode == GuaranteedBroadcastBlockBoundedCapacityMode.LargestQueue) |
| | | 81 | | { |
| | | 82 | | //In largest queue mode, the queue with the most total messages deliveries determines the Count. |
| | | 83 | | _totalMessagesDeliveredPerQueue = new(); |
| | | 84 | | } |
| | | 85 | | else |
| | | 86 | | { |
| | | 87 | | //In smallest queue mode, the queue with the least total messages deliveries determines the Count, |
| | | 88 | | _totalMessagesDeliveredPerQueue = new(Comparer<long>.Create((x, y) => y.CompareTo(x))); |
| | | 89 | | } |
| | | 90 | | |
| | | 91 | | for (var i = 0; i < nrOfSources; i++) |
| | | 92 | | { |
| | | 93 | | var j = i; |
| | | 94 | | _queues[i] = |
| | | 95 | | new BufferBlock<T>(new() { CancellationToken = options.CancellationToken }) |
| | | 96 | | .WithNotification(h => |
| | | 97 | | { |
| | | 98 | | h.OnDeliveringMessages = count => HandleMessageDelivery(count, j); |
| | | 99 | | h.ConfigureDispatching = q => q.UseInlineFirstThenAsyncDispatch(); |
| | | 100 | | }); |
| | | 101 | | |
| | | 102 | | |
| | | 103 | | _totalMessagesDeliveredPerQueue.Enqueue(i, 0, 0); |
| | | 104 | | } |
| | | 105 | | |
| | | 106 | | var a = new ActionBlock<T>(m => |
| | | 107 | | { |
| | | 108 | | for (var i = 0; i < nrOfSources; i++) |
| | | 109 | | { |
| | | 110 | | _queues[i].PostAsserted(m); |
| | | 111 | | } |
| | | 112 | | }, new() { CancellationToken = options.CancellationToken, SingleProducerConstrained = true }); |
| | | 113 | | _bounded = new(a, options.BoundedCapacity); |
| | | 114 | | |
| | | 115 | | var tcs = new TaskCompletionSource(); |
| | | 116 | | Completion = tcs.Task; |
| | | 117 | | Task.Run(async () => |
| | | 118 | | { |
| | | 119 | | var t = await Task.WhenAny(a.Completion); |
| | | 120 | | _ = t.PropagateCompletion(_queues); |
| | | 121 | | await Task.WhenAny(Task.WhenAll(_queues.Select(b => b.Completion))); |
| | | 122 | | if (options.CancellationToken.IsCancellationRequested) |
| | | 123 | | { |
| | | 124 | | tcs.SetCanceled(); |
| | | 125 | | } |
| | | 126 | | else |
| | | 127 | | { |
| | | 128 | | await t.PropagateCompletionAsync(tcs); |
| | | 129 | | } |
| | | 130 | | }); |
| | | 131 | | } |
| | | 132 | | |
| | | 133 | | private void HandleMessageDelivery(int count, int queueIndex) |
| | | 134 | | { |
| | | 135 | | lock (Lock) |
| | | 136 | | { |
| | | 137 | | //Remember the previous smallest/ largest queue total delivery |
| | | 138 | | var (_, totalDelivered, _) = _totalMessagesDeliveredPerQueue.Peek(); |
| | | 139 | | |
| | | 140 | | //Update my total delivered count |
| | | 141 | | var (myTotalDelivered, _) = _totalMessagesDeliveredPerQueue.Get(queueIndex); |
| | | 142 | | myTotalDelivered += count; |
| | | 143 | | _totalMessagesDeliveredPerQueue.Enqueue(queueIndex, myTotalDelivered, myTotalDelivered); |
| | | 144 | | |
| | | 145 | | //If the smallest/ largest queue total delivery changed, update blocks Count property. |
| | | 146 | | (_, var newTotalDelivered, _) = _totalMessagesDeliveredPerQueue.Peek(); |
| | | 147 | | var diff = (int)(totalDelivered - newTotalDelivered); |
| | | 148 | | if (diff < 0) |
| | | 149 | | { |
| | | 150 | | _bounded.AdjustCount(diff); |
| | | 151 | | _onQueuesShrunk?.Invoke(diff); |
| | | 152 | | } |
| | | 153 | | } |
| | | 154 | | } |
| | | 155 | | } |
| | | 156 | | } |