| | | 1 | | using CounterpointCollective.Collections; |
| | | 2 | | using CounterpointCollective.Dataflow.Internal; |
| | | 3 | | using System.Diagnostics.CodeAnalysis; |
| | | 4 | | using System.Threading.Tasks.Dataflow; |
| | | 5 | | |
| | | 6 | | namespace CounterpointCollective.Dataflow.Notifying |
| | | 7 | | { |
| | | 8 | | /// <summary> |
| | | 9 | | /// Notifies via Hooks when a message is sent to a target block and when a reservation is released. |
| | | 10 | | /// |
| | | 11 | | /// The evens are sent in order. |
| | | 12 | | /// </summary> |
| | | 13 | | public class NotifyingSourceBlock<T> : IReceivableSourceBlock<T> |
| | | 14 | | { |
| | | 15 | | private readonly CoalescingQueue<int> _eventQueue; |
| | | 16 | | |
| | 20426 | 17 | | private ISourceBlock<T> InnerBlock { get; } |
| | | 18 | | |
| | 185 | 19 | | private readonly TaskCompletionSource _tcs = new(); |
| | | 20 | | |
| | | 21 | | private readonly ICoalescingQueueHandle<int, int>? _deliveringMessagesHandle; |
| | | 22 | | private readonly ICoalescingQueueHandle<int, int>? _reservationReleasedHandle; |
| | | 23 | | |
| | 185 | 24 | | private NotifyingSourceBlock(ISourceBlock<T> innerBlock) |
| | | 25 | | { |
| | 185 | 26 | | var h = new NotifyingSourceBlockHooks<T>(this); |
| | 185 | 27 | | InnerBlock = innerBlock; |
| | 185 | 28 | | _eventQueue = new(); |
| | | 29 | | |
| | 185 | 30 | | Task.Run(async () => |
| | 185 | 31 | | { |
| | 185 | 32 | | await Task.WhenAny(InnerBlock.Completion); |
| | 133 | 33 | | _eventQueue.Complete(); |
| | 133 | 34 | | var t = await Task.WhenAny(Task.WhenAll(InnerBlock.Completion, _eventQueue.Completion)); |
| | 133 | 35 | | await t.PropagateCompletionAsync(_tcs); |
| | 318 | 36 | | }); |
| | | 37 | | |
| | 185 | 38 | | _eventQueue.Completion.ContinueWith(t => |
| | 185 | 39 | | { |
| | 1 | 40 | | Fault(new InvalidOperationException($"Event delivery failed: {t.Exception!.Message}", t.Exception)); |
| | 186 | 41 | | }, TaskContinuationOptions.OnlyOnFaulted); |
| | 185 | 42 | | } |
| | | 43 | | |
| | 184 | 44 | | public NotifyingSourceBlock(ISourceBlock<T> innerBlock, ConfigureHooks<T> c):this(innerBlock) |
| | | 45 | | { |
| | 184 | 46 | | var h = new NotifyingSourceBlockHooks<T>(this); |
| | 184 | 47 | | c(h); |
| | 184 | 48 | | var hooks = h with { }; //clone to prevent further modification |
| | 184 | 49 | | hooks.ConfigureDispatching(_eventQueue); |
| | | 50 | | |
| | 184 | 51 | | if (hooks.OnDeliveringMessages != null) |
| | | 52 | | { |
| | 184 | 53 | | _deliveringMessagesHandle = _eventQueue.CreateHandle( |
| | 184 | 54 | | true, |
| | 633955 | 55 | | static (ref v) => ref v, |
| | 316977 | 56 | | (in c) => hooks.OnDeliveringMessages(c.GetValue()) |
| | 184 | 57 | | ); |
| | | 58 | | } |
| | | 59 | | |
| | 184 | 60 | | if (hooks.OnReservationReleased != null) |
| | | 61 | | { |
| | 7 | 62 | | _reservationReleasedHandle = _eventQueue.CreateHandle( |
| | 7 | 63 | | true, |
| | 0 | 64 | | static (ref v) => ref v, |
| | 2 | 65 | | (in _) => hooks.OnReservationReleased() |
| | 7 | 66 | | ); |
| | | 67 | | } |
| | 184 | 68 | | } |
| | | 69 | | |
| | 1 | 70 | | public NotifyingSourceBlock(ISourceBlock<T> innerBlock, ConfigureAsyncHooks<T> c) : this(innerBlock) |
| | | 71 | | { |
| | 1 | 72 | | var h = new NotifyingSourceBlockAsyncHooks<T>(this); |
| | 1 | 73 | | c(h); |
| | 1 | 74 | | var hooks = h with { }; //clone to prevent further modification |
| | 1 | 75 | | hooks.ConfigureDispatching(_eventQueue); |
| | | 76 | | |
| | 1 | 77 | | if (hooks.OnDeliveringMessagesAsync != null) |
| | | 78 | | { |
| | 1 | 79 | | _deliveringMessagesHandle = _eventQueue.CreateHandle( |
| | 1 | 80 | | true, |
| | 2 | 81 | | (ref v) => ref v, |
| | 1 | 82 | | async c => await hooks.OnDeliveringMessagesAsync(c.GetValue()) |
| | 1 | 83 | | ); |
| | | 84 | | } |
| | | 85 | | |
| | 1 | 86 | | if (hooks.OnReservationReleasedAsync != null) |
| | | 87 | | { |
| | 0 | 88 | | _reservationReleasedHandle = _eventQueue.CreateHandle( |
| | 0 | 89 | | true, |
| | 0 | 90 | | (ref v) => ref v, |
| | 0 | 91 | | async _ => await hooks.OnReservationReleasedAsync() |
| | 0 | 92 | | ); |
| | | 93 | | } |
| | 1 | 94 | | } |
| | | 95 | | |
| | | 96 | | /// <exclude/> |
| | | 97 | | public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) |
| | | 98 | | { |
| | 468 | 99 | | var t = new TargetWrapper(this, target); |
| | 468 | 100 | | return InnerBlock.LinkTo(t, linkOptions); |
| | | 101 | | } |
| | | 102 | | |
| | | 103 | | /// <exclude/> |
| | 249 | 104 | | public Task Completion => _tcs.Task; |
| | | 105 | | |
| | | 106 | | /// <exclude/> |
| | 22 | 107 | | public void Complete() => InnerBlock.Complete(); |
| | | 108 | | |
| | | 109 | | /// <exclude/> |
| | | 110 | | public T? ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed) |
| | | 111 | | { |
| | 4226 | 112 | | if (_deliveringMessagesHandle != null && _deliveringMessagesHandle.EnqueueIfNotCompleted(out var e)) |
| | | 113 | | { |
| | 4225 | 114 | | var ret = InnerBlock.ConsumeMessage(messageHeader, target, out messageConsumed); |
| | 4225 | 115 | | if (messageConsumed) |
| | | 116 | | { |
| | 4225 | 117 | | e.MarkForDispatch(); |
| | 4225 | 118 | | Interlocked.Increment(ref e.Value); |
| | | 119 | | } |
| | 4225 | 120 | | e.Dispose(); |
| | 4225 | 121 | | return ret; |
| | | 122 | | } |
| | | 123 | | else |
| | | 124 | | { |
| | 1 | 125 | | return InnerBlock.ConsumeMessage(messageHeader, target, out messageConsumed); |
| | | 126 | | } |
| | | 127 | | } |
| | | 128 | | |
| | | 129 | | /// <exclude/> |
| | 1 | 130 | | public void Fault(Exception exception) => InnerBlock.Fault(exception); |
| | | 131 | | |
| | | 132 | | /// <exclude/> |
| | | 133 | | public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target) |
| | | 134 | | { |
| | 4 | 135 | | if (_reservationReleasedHandle != null && _reservationReleasedHandle.EnqueueIfNotCompleted(out var e)) |
| | | 136 | | { |
| | 2 | 137 | | InnerBlock.ReleaseReservation(messageHeader, target); |
| | 2 | 138 | | e.MarkForDispatch(); |
| | 2 | 139 | | e.Dispose(); |
| | | 140 | | } |
| | | 141 | | else |
| | | 142 | | { |
| | 2 | 143 | | InnerBlock.ReleaseReservation(messageHeader, target); |
| | | 144 | | } |
| | 2 | 145 | | } |
| | | 146 | | |
| | | 147 | | /// <exclude/> |
| | | 148 | | public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target) |
| | 54 | 149 | | => InnerBlock.ReserveMessage(messageHeader, target); |
| | | 150 | | |
| | | 151 | | /// <exclude/> |
| | | 152 | | public bool TryReceive(Predicate<T>? filter, [MaybeNullWhen(false)] out T item) |
| | | 153 | | { |
| | 14855 | 154 | | if (InnerBlock is IReceivableSourceBlock<T> r) |
| | | 155 | | { |
| | 14855 | 156 | | if (_deliveringMessagesHandle != null && _deliveringMessagesHandle.EnqueueIfNotCompleted(out var e)) |
| | | 157 | | { |
| | 14855 | 158 | | var res = r.TryReceive(filter, out item); |
| | 14855 | 159 | | if (res) |
| | | 160 | | { |
| | 14728 | 161 | | e.MarkForDispatch(); |
| | 14728 | 162 | | Interlocked.Increment(ref e.Value); |
| | | 163 | | } |
| | 14855 | 164 | | e.Dispose(); |
| | 14855 | 165 | | return res; |
| | | 166 | | } else |
| | | 167 | | { |
| | 0 | 168 | | return r.TryReceive(filter, out item); |
| | | 169 | | } |
| | | 170 | | } |
| | | 171 | | else |
| | | 172 | | { |
| | 0 | 173 | | item = default; |
| | 0 | 174 | | return false; |
| | | 175 | | } |
| | | 176 | | } |
| | | 177 | | |
| | | 178 | | /// <exclude/> |
| | | 179 | | public bool TryReceiveAll([NotNullWhen(true)] out IList<T>? items) |
| | | 180 | | { |
| | 478 | 181 | | if (InnerBlock is IReceivableSourceBlock<T> r) |
| | | 182 | | { |
| | 478 | 183 | | if (_deliveringMessagesHandle != null && _deliveringMessagesHandle.EnqueueIfNotCompleted(out var e)) |
| | | 184 | | { |
| | 475 | 185 | | var res = r.TryReceiveAll(out items); |
| | 475 | 186 | | if (res) |
| | | 187 | | { |
| | 282 | 188 | | e.MarkForDispatch(); |
| | 282 | 189 | | Interlocked.Add(ref e.Value, items!.Count); |
| | | 190 | | } |
| | 475 | 191 | | e.Dispose(); |
| | 475 | 192 | | return res; |
| | | 193 | | } else |
| | | 194 | | { |
| | 3 | 195 | | return r.TryReceiveAll(out items); |
| | | 196 | | } |
| | | 197 | | |
| | | 198 | | } |
| | | 199 | | else |
| | | 200 | | { |
| | 0 | 201 | | items = null; |
| | 0 | 202 | | return false; |
| | | 203 | | } |
| | | 204 | | } |
| | | 205 | | |
| | | 206 | | /// <summary> |
| | | 207 | | /// Processes all pending events in the event queue and blocks until all events have been handled. |
| | | 208 | | /// </summary> |
| | | 209 | | /// <remarks>This method ensures that any events currently queued are processed before returning. |
| | | 210 | | /// If all events have already been processed, the method returns immediately. Otherwise, it waits until the las |
| | | 211 | | /// queued is processed. This method is typically used to synchronize with the event system and guarantee that n |
| | | 212 | | /// remain before proceeding. |
| | | 213 | | /// </remarks> |
| | | 214 | | public ValueTask ProcessPendingEventsAsync() |
| | 4823 | 215 | | => _eventQueue.AwaitQueueProcessed(); |
| | | 216 | | |
| | | 217 | | public void ProcessPendingEvents() |
| | | 218 | | { |
| | 4808 | 219 | | var v = ProcessPendingEventsAsync(); |
| | 4808 | 220 | | if (!v.IsCompleted) |
| | | 221 | | { |
| | 0 | 222 | | v.AsTask().Wait(); |
| | | 223 | | } |
| | 4808 | 224 | | } |
| | | 225 | | |
| | 468 | 226 | | private sealed class TargetWrapper(NotifyingSourceBlock<T> outer, ITargetBlock<T> target) : ITargetBlock<T> |
| | | 227 | | { |
| | | 228 | | public DataflowMessageStatus OfferMessage( |
| | | 229 | | DataflowMessageHeader messageHeader, |
| | | 230 | | T messageValue, |
| | | 231 | | ISourceBlock<T>? source, |
| | | 232 | | bool consumeToAccept |
| | | 233 | | ) |
| | | 234 | | { |
| | 354446 | 235 | | if (!consumeToAccept && outer._deliveringMessagesHandle != null && outer._deliveringMessagesHandle.Enque |
| | | 236 | | { |
| | 354446 | 237 | | var res = target.OfferMessage(messageHeader, messageValue, outer, consumeToAccept); |
| | 354446 | 238 | | if (res == DataflowMessageStatus.Accepted) |
| | | 239 | | { |
| | 297744 | 240 | | e.MarkForDispatch(); |
| | 297744 | 241 | | Interlocked.Increment(ref e.Value); |
| | | 242 | | } |
| | 354446 | 243 | | e.Dispose(); |
| | 354446 | 244 | | return res; |
| | | 245 | | } |
| | | 246 | | else |
| | | 247 | | { |
| | 0 | 248 | | return target.OfferMessage(messageHeader, messageValue, outer, consumeToAccept); |
| | | 249 | | } |
| | | 250 | | } |
| | | 251 | | |
| | 0 | 252 | | public Task Completion => target.Completion; |
| | | 253 | | |
| | | 254 | | //When InnerSource is propagating completion to us, delay forwarding completion to the target until the oute |
| | | 255 | | //is completed because the outer block may still have pending events to deliver. |
| | | 256 | | public void Complete() |
| | | 257 | | { |
| | 51 | 258 | | outer.Completion.PropagateCompletion(target); |
| | 51 | 259 | | } |
| | | 260 | | |
| | | 261 | | public void Fault(Exception exception) |
| | | 262 | | { |
| | 13 | 263 | | outer.Completion.PropagateCompletion(target); |
| | 13 | 264 | | } |
| | | 265 | | } |
| | | 266 | | } |
| | | 267 | | } |