< Summary

Information
Class: CounterpointCollective.Dataflow.Notifying.NotifyingSourceBlock<T>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/Notifying/NotifyingSourceBlock.cs
Line coverage
87%
Covered lines: 101
Uncovered lines: 14
Coverable lines: 115
Total lines: 267
Line coverage: 87.8%
Branch coverage
78%
Covered branches: 41
Total branches: 52
Branch coverage: 78.8%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_InnerBlock()100%11100%
.ctor(...)100%11100%
.ctor(...)100%8894.44%
.ctor(...)62.5%9872.22%
LinkTo(...)100%11100%
get_Completion()100%11100%
Complete()100%11100%
ConsumeMessage(...)100%66100%
Fault(...)100%11100%
ReleaseReservation(...)100%44100%
ReserveMessage(...)100%11100%
TryReceive(...)62.5%9872.72%
TryReceiveAll(...)87.5%8881.81%
ProcessPendingEventsAsync()100%11100%
ProcessPendingEvents()50%2275%
.ctor(...)100%11100%
OfferMessage(...)62.5%8887.5%
get_Completion()100%210%
Complete()100%11100%
Fault(...)100%11100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/Notifying/NotifyingSourceBlock.cs

#LineLine coverage
 1using CounterpointCollective.Collections;
 2using CounterpointCollective.Dataflow.Internal;
 3using System.Diagnostics.CodeAnalysis;
 4using System.Threading.Tasks.Dataflow;
 5
 6namespace CounterpointCollective.Dataflow.Notifying
 7{
 8    /// <summary>
 9    /// Notifies via Hooks when a message is sent to a target block and when a reservation is released.
 10    ///
 11    /// The evens are sent in order.
 12    /// </summary>
 13    public class NotifyingSourceBlock<T> : IReceivableSourceBlock<T>
 14    {
 15        private readonly CoalescingQueue<int> _eventQueue;
 16
 2042617        private ISourceBlock<T> InnerBlock { get; }
 18
 18519        private readonly TaskCompletionSource _tcs = new();
 20
 21        private readonly ICoalescingQueueHandle<int, int>? _deliveringMessagesHandle;
 22        private readonly ICoalescingQueueHandle<int, int>? _reservationReleasedHandle;
 23
 18524        private NotifyingSourceBlock(ISourceBlock<T> innerBlock)
 25        {
 18526            var h = new NotifyingSourceBlockHooks<T>(this);
 18527            InnerBlock = innerBlock;
 18528            _eventQueue = new();
 29
 18530            Task.Run(async () =>
 18531            {
 18532                await Task.WhenAny(InnerBlock.Completion);
 13333                _eventQueue.Complete();
 13334                var t = await Task.WhenAny(Task.WhenAll(InnerBlock.Completion, _eventQueue.Completion));
 13335                await t.PropagateCompletionAsync(_tcs);
 31836            });
 37
 18538            _eventQueue.Completion.ContinueWith(t =>
 18539            {
 140                Fault(new InvalidOperationException($"Event delivery failed: {t.Exception!.Message}", t.Exception));
 18641            }, TaskContinuationOptions.OnlyOnFaulted);
 18542        }
 43
 18444        public NotifyingSourceBlock(ISourceBlock<T> innerBlock, ConfigureHooks<T> c):this(innerBlock)
 45        {
 18446            var h = new NotifyingSourceBlockHooks<T>(this);
 18447            c(h);
 18448            var hooks = h with { }; //clone to prevent further modification
 18449            hooks.ConfigureDispatching(_eventQueue);
 50
 18451            if (hooks.OnDeliveringMessages != null)
 52            {
 18453                _deliveringMessagesHandle = _eventQueue.CreateHandle(
 18454                    true,
 63395555                    static (ref v) => ref v,
 31697756                    (in c) => hooks.OnDeliveringMessages(c.GetValue())
 18457                );
 58            }
 59
 18460            if (hooks.OnReservationReleased != null)
 61            {
 762                _reservationReleasedHandle = _eventQueue.CreateHandle(
 763                    true,
 064                    static (ref v) => ref v,
 265                    (in _) => hooks.OnReservationReleased()
 766                );
 67            }
 18468        }
 69
 170        public NotifyingSourceBlock(ISourceBlock<T> innerBlock, ConfigureAsyncHooks<T> c) : this(innerBlock)
 71        {
 172            var h = new NotifyingSourceBlockAsyncHooks<T>(this);
 173            c(h);
 174            var hooks = h with { }; //clone to prevent further modification
 175            hooks.ConfigureDispatching(_eventQueue);
 76
 177            if (hooks.OnDeliveringMessagesAsync != null)
 78            {
 179                _deliveringMessagesHandle = _eventQueue.CreateHandle(
 180                    true,
 281                    (ref v) => ref v,
 182                    async c => await hooks.OnDeliveringMessagesAsync(c.GetValue())
 183                );
 84            }
 85
 186            if (hooks.OnReservationReleasedAsync != null)
 87            {
 088                _reservationReleasedHandle = _eventQueue.CreateHandle(
 089                    true,
 090                    (ref v) => ref v,
 091                    async _ => await hooks.OnReservationReleasedAsync()
 092                );
 93            }
 194        }
 95
 96        /// <exclude/>
 97        public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
 98        {
 46899            var t = new TargetWrapper(this, target);
 468100            return InnerBlock.LinkTo(t, linkOptions);
 101        }
 102
 103        /// <exclude/>
 249104        public Task Completion => _tcs.Task;
 105
 106        /// <exclude/>
 22107        public void Complete() => InnerBlock.Complete();
 108
 109        /// <exclude/>
 110        public T? ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
 111        {
 4226112            if (_deliveringMessagesHandle != null && _deliveringMessagesHandle.EnqueueIfNotCompleted(out var e))
 113            {
 4225114                var ret = InnerBlock.ConsumeMessage(messageHeader, target, out messageConsumed);
 4225115                if (messageConsumed)
 116                {
 4225117                    e.MarkForDispatch();
 4225118                    Interlocked.Increment(ref e.Value);
 119                }
 4225120                e.Dispose();
 4225121                return ret;
 122            }
 123            else
 124            {
 1125                return InnerBlock.ConsumeMessage(messageHeader, target, out messageConsumed);
 126            }
 127        }
 128
 129        /// <exclude/>
 1130        public void Fault(Exception exception) => InnerBlock.Fault(exception);
 131
 132        /// <exclude/>
 133        public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
 134        {
 4135            if (_reservationReleasedHandle != null && _reservationReleasedHandle.EnqueueIfNotCompleted(out var e))
 136            {
 2137                InnerBlock.ReleaseReservation(messageHeader, target);
 2138                e.MarkForDispatch();
 2139                e.Dispose();
 140            }
 141            else
 142            {
 2143                InnerBlock.ReleaseReservation(messageHeader, target);
 144            }
 2145        }
 146
 147        /// <exclude/>
 148        public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
 54149            => InnerBlock.ReserveMessage(messageHeader, target);
 150
 151        /// <exclude/>
 152        public bool TryReceive(Predicate<T>? filter, [MaybeNullWhen(false)] out T item)
 153        {
 14855154            if (InnerBlock is IReceivableSourceBlock<T> r)
 155            {
 14855156                if (_deliveringMessagesHandle != null && _deliveringMessagesHandle.EnqueueIfNotCompleted(out var e))
 157                {
 14855158                    var res = r.TryReceive(filter, out item);
 14855159                    if (res)
 160                    {
 14728161                        e.MarkForDispatch();
 14728162                        Interlocked.Increment(ref e.Value);
 163                    }
 14855164                    e.Dispose();
 14855165                    return res;
 166                } else
 167                {
 0168                    return r.TryReceive(filter, out item);
 169                }
 170            }
 171            else
 172            {
 0173                item = default;
 0174                return false;
 175            }
 176        }
 177
 178        /// <exclude/>
 179        public bool TryReceiveAll([NotNullWhen(true)] out IList<T>? items)
 180        {
 478181            if (InnerBlock is IReceivableSourceBlock<T> r)
 182            {
 478183                if (_deliveringMessagesHandle != null && _deliveringMessagesHandle.EnqueueIfNotCompleted(out var e))
 184                {
 475185                    var res = r.TryReceiveAll(out items);
 475186                    if (res)
 187                    {
 282188                        e.MarkForDispatch();
 282189                        Interlocked.Add(ref e.Value, items!.Count);
 190                    }
 475191                    e.Dispose();
 475192                    return res;
 193                } else
 194                {
 3195                    return r.TryReceiveAll(out items);
 196                }
 197
 198            }
 199            else
 200            {
 0201                items = null;
 0202                return false;
 203            }
 204        }
 205
 206        /// <summary>
 207        /// Processes all pending events in the event queue and blocks until all events have been handled.
 208        /// </summary>
 209        /// <remarks>This method ensures that any events currently queued are processed before returning.
 210        /// If all events have already been processed, the method returns immediately. Otherwise, it waits until the las
 211        /// queued is processed. This method is typically used to synchronize with the event system and guarantee that n
 212        /// remain before proceeding.
 213        /// </remarks>
 214        public ValueTask ProcessPendingEventsAsync()
 4823215            =>  _eventQueue.AwaitQueueProcessed();
 216
 217        public void ProcessPendingEvents()
 218        {
 4808219            var v = ProcessPendingEventsAsync();
 4808220            if (!v.IsCompleted)
 221            {
 0222                v.AsTask().Wait();
 223            }
 4808224        }
 225
 468226        private sealed class TargetWrapper(NotifyingSourceBlock<T> outer, ITargetBlock<T> target) : ITargetBlock<T>
 227        {
 228            public DataflowMessageStatus OfferMessage(
 229                DataflowMessageHeader messageHeader,
 230                T messageValue,
 231                ISourceBlock<T>? source,
 232                bool consumeToAccept
 233            )
 234            {
 354446235                if (!consumeToAccept && outer._deliveringMessagesHandle != null && outer._deliveringMessagesHandle.Enque
 236                {
 354446237                    var res = target.OfferMessage(messageHeader, messageValue, outer, consumeToAccept);
 354446238                    if (res == DataflowMessageStatus.Accepted)
 239                    {
 297744240                        e.MarkForDispatch();
 297744241                        Interlocked.Increment(ref e.Value);
 242                    }
 354446243                    e.Dispose();
 354446244                    return res;
 245                }
 246                else
 247                {
 0248                    return target.OfferMessage(messageHeader, messageValue, outer, consumeToAccept);
 249                }
 250            }
 251
 0252            public Task Completion => target.Completion;
 253
 254            //When InnerSource is propagating completion to us, delay forwarding completion to the target until the oute
 255            //is completed because the outer block may still have pending events to deliver.
 256            public void Complete()
 257            {
 51258                outer.Completion.PropagateCompletion(target);
 51259            }
 260
 261            public void Fault(Exception exception)
 262            {
 13263                outer.Completion.PropagateCompletion(target);
 13264            }
 265        }
 266    }
 267}