< Summary

Information
Class: CounterpointCollective.Dataflow.AdmissionGateBlock<T>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/AdmissionGateBlock.cs
Line coverage
88%
Covered lines: 100
Uncovered lines: 13
Coverable lines: 113
Total lines: 245
Line coverage: 88.4%
Branch coverage
68%
Covered branches: 30
Total branches: 44
Branch coverage: 68.1%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_TargetSide()100%11100%
get_CompletionSide()100%210%
get_Completion()100%11100%
.ctor(...)100%11100%
get_IsCompletionRequested()100%11100%
get_InputCompletion()100%11100%
ConfigureHooks(...)100%11100%
ProcessPostponedMessages()100%22100%
Fault(...)100%11100%
Complete()100%11100%
.ctor(...)100%11100%
get_Completion()100%210%
Complete()100%210%
Fault(...)100%210%
OfferMessage(...)58.33%382470.83%
Clone(...)100%11100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/AdmissionGateBlock.cs

#LineLine coverage
 1using CounterpointCollective.Dataflow.Encapsulation;
 2using CounterpointCollective.Dataflow.Internal;
 3using System.Threading.Tasks.Dataflow;
 4namespace CounterpointCollective.Dataflow
 5{
 6
 7    /// <summary>
 8    /// Hooks for controlling and observing admission into a block.
 9    /// </summary>
 10    /// <remarks>
 11    /// All hook callbacks are invoked serially: at most one hook method
 12    /// is executing at any given time. No hook is ever called concurrently
 13    /// with itself or with another hook.
 14    /// </remarks>
 15    public record AdmissionGateHooks
 16    {
 17        /// <summary>
 18        /// When this returns true, always either Entering or FailingToEnter will be called afterwards.
 19        /// </summary>
 20        public Func<bool>? MayTryToEnter { get; set; }
 21
 22        /// <summary>
 23        /// A message is successfully entering the block.
 24        /// Allows you to commit state changes accordingly.
 25        /// </summary>
 26        public Action? Entering { get; set; }
 27
 28        /// <summary>
 29        /// Called when a message has successfully entered the block, and after all state changes.
 30        /// </summary>
 31        public Action? HasEntered { get; set; }
 32
 33        /// <summary>
 34        /// Although a message was allowed to enter, it failed to do so (e.g. because it could not be consumed from the 
 35        /// Allows you to roll back any state changes made in MayTryToEnter.
 36        /// </summary>
 37        public Action? FailingToEnter { get; set; } = () => { };
 38    }
 39
 40    /// <summary>
 41    /// A dataflow block that conditionally admits messages into an inner target block based on user-defined <see cref="
 42    ///
 43    /// <para>Use this block when you want to control when/which messages are allowed to enter the block
 44    /// dynamically, e.g. by throttling rules.</para>
 45    ///
 46    /// <para>Features include:</para>
 47    /// <list type="bullet">
 48    ///   <item><description>Hooks to decide whether a message may enter, when it enters, and if entry fails.</descripti
 49    ///   <item><description>Automatic postponement of messages that are not currently allowed to enter.</description></
 50    ///   <item><description>Processing of postponed messages when state changes allow them to be admitted.</description
 51    /// </list>
 52    /// </summary>
 53    public class AdmissionGateBlock<T> : AbstractEncapsulatedTargetBlock<T>
 54    {
 55        /// <exclude/>
 112797856        protected override ITargetBlock<T> TargetSide { get; }
 57        /// <exclude/>
 058        protected override IDataflowBlock CompletionSide => this;
 59        /// <exclude/>
 22871260        public override Task Completion { get; }
 61        private readonly UnboundedTargetBlock<T> _inner;
 62
 63        private AdmissionGateHooks hooks;
 64        private readonly PostponedMessages<AdmissionGateBlock<T>, T> _postponedMessages;
 65        private readonly PostponedMessagesManager<AdmissionGateBlock<T>, T> _postponedMessagesManager;
 66
 11367        private readonly TaskCompletionSource _tcsCompletionRequest = new();
 68
 69        /// <summary>
 70        /// Whether a completion request has been made.
 71        /// </summary>
 112805472        public bool IsCompletionRequested => _tcsCompletionRequest.Task.IsCompleted;
 73
 74        /// <summary>
 75        /// Whether a completion request has been made and there are no more buffered messages waiting to enter.
 76        /// </summary>
 17777        public Task InputCompletion => _inner.InputCompletion;
 78
 79        public void ConfigureHooks(AdmissionGateHooks h)
 80        {
 1181            lock(_postponedMessagesManager.IncomingLock)
 82            {
 1183                hooks = Clone(h);
 1184                _postponedMessagesManager.ProcessPostponedMessages();
 1185            }
 1186        }
 87
 11388        public AdmissionGateBlock(ITargetBlock<T> buffer, AdmissionGateHooks h)
 89        {
 11390            TargetSide = new Target(this);
 11391            hooks = Clone(h);
 92
 11393            _inner = new UnboundedTargetBlock<T>(buffer);
 94
 11395            _postponedMessages = new(this);
 11396            _postponedMessagesManager = new(
 11397                (out PostponedMessage<AdmissionGateBlock<T>, T> p) =>
 11398                {
 21490599                    if (
 214905100                        !_postponedMessages.IsEmpty
 214905101                        && hooks.MayTryToEnter != null
 214905102                        && hooks.MayTryToEnter()
 214905103                        && _postponedMessages.TryDequeuePostponed(out p))
 113104                        {
 102271105                            return true;
 113106                        }
 113107                    else
 113108                    {
 112634109                        p = default;
 112634110                        return false;
 113111                    }
 113112                }
 113113            );
 114
 115
 113116            _postponedMessagesManager.Consume =
 113117                (in PostponedMessage<AdmissionGateBlock<T>, T> postponedMessage) =>
 113118                {
 102271119                    var succ = postponedMessage.Consume(out var msg);
 113120                    {
 113121                        //Lock to prevent hooks from being updated concurrently while we are invoking them.
 102271122                        lock (_postponedMessagesManager.IncomingLock)
 113123                        {
 102271124                            if (succ)
 113125                            {
 102271126                                hooks.Entering?.Invoke();
 102271127                                _inner.PostAsserted(msg!);
 102271128                                hooks.HasEntered?.Invoke();
 113129                            }
 113130                            else
 113131                            {
 0132                                hooks.FailingToEnter?.Invoke();
 113133                            }
 0134                        }
 113135                    }
 102384136                };
 137
 113138            var tcs = new TaskCompletionSource();
 113139            Completion = tcs.Task;
 113140            Task.Run(async () =>
 113141            {
 113142                var t1 = await Task.WhenAny(_tcsCompletionRequest.Task);
 76143                var t2 = await Task.WhenAny(_postponedMessagesManager.ShutdownAsync());
 76144                _ = t1.PropagateCompletion(_inner);
 76145                var t3 = await Task.WhenAny(_inner.Completion);
 76146                await Task.WhenAll(t3).PropagateCompletionAsync(tcs);
 189147            }, CancellationToken.None);
 148
 113149            _inner.Completion.ContinueWith(_ =>
 113150            {
 76151                if (!IsCompletionRequested)
 113152                {
 9153                    Fault(new InvalidOperationException("Source side completed unexpectedly."));
 113154                }
 189155            });
 113156        }
 157
 158        /// <summary>
 159        /// Processes any messages that have been postponed and are now eligible for handling.
 160        ///
 161        /// You typically may want to call this method after changing the state that is checked in the MayTryToEnter hoo
 162        /// transitioned to allow messages to enter.
 163        /// </summary>
 164        public void ProcessPostponedMessages()
 165        {
 316923166            if (hooks.MayTryToEnter != null)
 167            {
 315745168                _postponedMessagesManager.ProcessPostponedMessages();
 169            } else
 170            {
 171                //nothing to do because we always allow every message anyway.
 172            }
 316923173        }
 174
 175        /// <exclude/>
 176
 31177        public override void Fault(Exception exception) => _tcsCompletionRequest.TrySetException(exception);
 178
 179        /// <exclude/>
 52180        public override void Complete() => _tcsCompletionRequest.TrySetResult();
 181
 113182        private class Target(AdmissionGateBlock<T> outer) : ITargetBlock<T>
 183        {
 0184            public Task Completion => outer.Completion;
 0185            public void Complete() => outer.Complete();
 0186            public void Fault(Exception exception) => outer.Fault(exception);
 187            public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<
 188            {
 1127978189                if (outer.IsCompletionRequested)
 190                {
 0191                    return DataflowMessageStatus.DecliningPermanently;
 192                }
 1127978193                if (consumeToAccept && source == null)
 194                {
 0195                    return DataflowMessageStatus.Declined; //not allowed in protocol
 196                }
 197
 1127978198                lock (outer._postponedMessagesManager.IncomingLock)
 199                {
 1127978200                    var allow =
 1127978201                        !outer._postponedMessagesManager.IsRunningPostponedMessages &&
 1127978202                        (outer.hooks.MayTryToEnter == null || outer.hooks.MayTryToEnter());
 203
 1127978204                    if (!allow)
 205                    {
 102319206                        if (source != null)
 207                        {
 102275208                            outer._postponedMessages.Postpone(source, messageHeader);
 102275209                            return DataflowMessageStatus.Postponed;
 210                        }
 211                        else
 212                        {
 44213                            return DataflowMessageStatus.Declined;
 214                        }
 215                    }
 1025659216                    else if (consumeToAccept)
 217                    {
 0218                        var newValue = source!.ConsumeMessage(messageHeader, this, out var messageConsumed);
 0219                        if (!messageConsumed)
 220                        {
 0221                            outer.hooks.FailingToEnter?.Invoke();
 0222                            return DataflowMessageStatus.NotAvailable;
 223                        }
 224                        else
 225                        {
 0226                            messageValue = newValue!;
 227                        }
 228                    }
 1025659229                    outer.hooks.Entering?.Invoke();
 1025659230                    outer._inner.PostAsserted(messageValue);
 1025659231                    outer.hooks.HasEntered?.Invoke();
 1025659232                    return DataflowMessageStatus.Accepted;
 233                }
 1127978234            }
 235        }
 236
 124237        private static AdmissionGateHooks Clone(AdmissionGateHooks hooks) => new()
 124238        {
 124239            Entering = hooks.Entering,
 124240            FailingToEnter = hooks.FailingToEnter,
 124241            MayTryToEnter = hooks.MayTryToEnter,
 124242            HasEntered = hooks.HasEntered,
 124243        };
 244    }
 245}