< Summary

Information
Class: CounterpointCollective.Dataflow.AdmissionGateHooks
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/AdmissionGateBlock.cs
Line coverage
80%
Covered lines: 4
Uncovered lines: 1
Coverable lines: 5
Total lines: 245
Line coverage: 80%
Branch coverage
N/A
Covered branches: 0
Total branches: 0
Branch coverage: N/A
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%210%
get_MayTryToEnter()100%11100%
get_Entering()100%11100%
get_HasEntered()100%11100%
get_FailingToEnter()100%11100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/AdmissionGateBlock.cs

#LineLine coverage
 1using CounterpointCollective.Dataflow.Encapsulation;
 2using CounterpointCollective.Dataflow.Internal;
 3using System.Threading.Tasks.Dataflow;
 4namespace CounterpointCollective.Dataflow
 5{
 6
 7    /// <summary>
 8    /// Hooks for controlling and observing admission into a block.
 9    /// </summary>
 10    /// <remarks>
 11    /// All hook callbacks are invoked serially: at most one hook method
 12    /// is executing at any given time. No hook is ever called concurrently
 13    /// with itself or with another hook.
 14    /// </remarks>
 015    public record AdmissionGateHooks
 16    {
 17        /// <summary>
 18        /// When this returns true, always either Entering or FailingToEnter will be called afterwards.
 19        /// </summary>
 279199520        public Func<bool>? MayTryToEnter { get; set; }
 21
 22        /// <summary>
 23        /// A message is successfully entering the block.
 24        /// Allows you to commit state changes accordingly.
 25        /// </summary>
 112830226        public Action? Entering { get; set; }
 27
 28        /// <summary>
 29        /// Called when a message has successfully entered the block, and after all state changes.
 30        /// </summary>
 112827431        public Action? HasEntered { get; set; }
 32
 33        /// <summary>
 34        /// Although a message was allowed to enter, it failed to do so (e.g. because it could not be consumed from the 
 35        /// Allows you to roll back any state changes made in MayTryToEnter.
 36        /// </summary>
 55137        public Action? FailingToEnter { get; set; } = () => { };
 38    }
 39
 40    /// <summary>
 41    /// A dataflow block that conditionally admits messages into an inner target block based on user-defined <see cref="
 42    ///
 43    /// <para>Use this block when you want to control when/which messages are allowed to enter the block
 44    /// dynamically, e.g. by throttling rules.</para>
 45    ///
 46    /// <para>Features include:</para>
 47    /// <list type="bullet">
 48    ///   <item><description>Hooks to decide whether a message may enter, when it enters, and if entry fails.</descripti
 49    ///   <item><description>Automatic postponement of messages that are not currently allowed to enter.</description></
 50    ///   <item><description>Processing of postponed messages when state changes allow them to be admitted.</description
 51    /// </list>
 52    /// </summary>
 53    public class AdmissionGateBlock<T> : AbstractEncapsulatedTargetBlock<T>
 54    {
 55        /// <exclude/>
 56        protected override ITargetBlock<T> TargetSide { get; }
 57        /// <exclude/>
 58        protected override IDataflowBlock CompletionSide => this;
 59        /// <exclude/>
 60        public override Task Completion { get; }
 61        private readonly UnboundedTargetBlock<T> _inner;
 62
 63        private AdmissionGateHooks hooks;
 64        private readonly PostponedMessages<AdmissionGateBlock<T>, T> _postponedMessages;
 65        private readonly PostponedMessagesManager<AdmissionGateBlock<T>, T> _postponedMessagesManager;
 66
 67        private readonly TaskCompletionSource _tcsCompletionRequest = new();
 68
 69        /// <summary>
 70        /// Whether a completion request has been made.
 71        /// </summary>
 72        public bool IsCompletionRequested => _tcsCompletionRequest.Task.IsCompleted;
 73
 74        /// <summary>
 75        /// Whether a completion request has been made and there are no more buffered messages waiting to enter.
 76        /// </summary>
 77        public Task InputCompletion => _inner.InputCompletion;
 78
 79        public void ConfigureHooks(AdmissionGateHooks h)
 80        {
 81            lock(_postponedMessagesManager.IncomingLock)
 82            {
 83                hooks = Clone(h);
 84                _postponedMessagesManager.ProcessPostponedMessages();
 85            }
 86        }
 87
 88        public AdmissionGateBlock(ITargetBlock<T> buffer, AdmissionGateHooks h)
 89        {
 90            TargetSide = new Target(this);
 91            hooks = Clone(h);
 92
 93            _inner = new UnboundedTargetBlock<T>(buffer);
 94
 95            _postponedMessages = new(this);
 96            _postponedMessagesManager = new(
 97                (out PostponedMessage<AdmissionGateBlock<T>, T> p) =>
 98                {
 99                    if (
 100                        !_postponedMessages.IsEmpty
 101                        && hooks.MayTryToEnter != null
 102                        && hooks.MayTryToEnter()
 103                        && _postponedMessages.TryDequeuePostponed(out p))
 104                        {
 105                            return true;
 106                        }
 107                    else
 108                    {
 109                        p = default;
 110                        return false;
 111                    }
 112                }
 113            );
 114
 115
 116            _postponedMessagesManager.Consume =
 117                (in PostponedMessage<AdmissionGateBlock<T>, T> postponedMessage) =>
 118                {
 119                    var succ = postponedMessage.Consume(out var msg);
 120                    {
 121                        //Lock to prevent hooks from being updated concurrently while we are invoking them.
 122                        lock (_postponedMessagesManager.IncomingLock)
 123                        {
 124                            if (succ)
 125                            {
 126                                hooks.Entering?.Invoke();
 127                                _inner.PostAsserted(msg!);
 128                                hooks.HasEntered?.Invoke();
 129                            }
 130                            else
 131                            {
 132                                hooks.FailingToEnter?.Invoke();
 133                            }
 134                        }
 135                    }
 136                };
 137
 138            var tcs = new TaskCompletionSource();
 139            Completion = tcs.Task;
 140            Task.Run(async () =>
 141            {
 142                var t1 = await Task.WhenAny(_tcsCompletionRequest.Task);
 143                var t2 = await Task.WhenAny(_postponedMessagesManager.ShutdownAsync());
 144                _ = t1.PropagateCompletion(_inner);
 145                var t3 = await Task.WhenAny(_inner.Completion);
 146                await Task.WhenAll(t3).PropagateCompletionAsync(tcs);
 147            }, CancellationToken.None);
 148
 149            _inner.Completion.ContinueWith(_ =>
 150            {
 151                if (!IsCompletionRequested)
 152                {
 153                    Fault(new InvalidOperationException("Source side completed unexpectedly."));
 154                }
 155            });
 156        }
 157
 158        /// <summary>
 159        /// Processes any messages that have been postponed and are now eligible for handling.
 160        ///
 161        /// You typically may want to call this method after changing the state that is checked in the MayTryToEnter hoo
 162        /// transitioned to allow messages to enter.
 163        /// </summary>
 164        public void ProcessPostponedMessages()
 165        {
 166            if (hooks.MayTryToEnter != null)
 167            {
 168                _postponedMessagesManager.ProcessPostponedMessages();
 169            } else
 170            {
 171                //nothing to do because we always allow every message anyway.
 172            }
 173        }
 174
 175        /// <exclude/>
 176
 177        public override void Fault(Exception exception) => _tcsCompletionRequest.TrySetException(exception);
 178
 179        /// <exclude/>
 180        public override void Complete() => _tcsCompletionRequest.TrySetResult();
 181
 182        private class Target(AdmissionGateBlock<T> outer) : ITargetBlock<T>
 183        {
 184            public Task Completion => outer.Completion;
 185            public void Complete() => outer.Complete();
 186            public void Fault(Exception exception) => outer.Fault(exception);
 187            public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<
 188            {
 189                if (outer.IsCompletionRequested)
 190                {
 191                    return DataflowMessageStatus.DecliningPermanently;
 192                }
 193                if (consumeToAccept && source == null)
 194                {
 195                    return DataflowMessageStatus.Declined; //not allowed in protocol
 196                }
 197
 198                lock (outer._postponedMessagesManager.IncomingLock)
 199                {
 200                    var allow =
 201                        !outer._postponedMessagesManager.IsRunningPostponedMessages &&
 202                        (outer.hooks.MayTryToEnter == null || outer.hooks.MayTryToEnter());
 203
 204                    if (!allow)
 205                    {
 206                        if (source != null)
 207                        {
 208                            outer._postponedMessages.Postpone(source, messageHeader);
 209                            return DataflowMessageStatus.Postponed;
 210                        }
 211                        else
 212                        {
 213                            return DataflowMessageStatus.Declined;
 214                        }
 215                    }
 216                    else if (consumeToAccept)
 217                    {
 218                        var newValue = source!.ConsumeMessage(messageHeader, this, out var messageConsumed);
 219                        if (!messageConsumed)
 220                        {
 221                            outer.hooks.FailingToEnter?.Invoke();
 222                            return DataflowMessageStatus.NotAvailable;
 223                        }
 224                        else
 225                        {
 226                            messageValue = newValue!;
 227                        }
 228                    }
 229                    outer.hooks.Entering?.Invoke();
 230                    outer._inner.PostAsserted(messageValue);
 231                    outer.hooks.HasEntered?.Invoke();
 232                    return DataflowMessageStatus.Accepted;
 233                }
 234            }
 235        }
 236
 237        private static AdmissionGateHooks Clone(AdmissionGateHooks hooks) => new()
 238        {
 239            Entering = hooks.Entering,
 240            FailingToEnter = hooks.FailingToEnter,
 241            MayTryToEnter = hooks.MayTryToEnter,
 242            HasEntered = hooks.HasEntered,
 243        };
 244    }
 245}