< Summary

Information
Class: CounterpointCollective.Dataflow.BoundedTargetBlock<T>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/BoundedTargetBlock.cs
Line coverage
94%
Covered lines: 91
Uncovered lines: 5
Coverable lines: 96
Total lines: 206
Line coverage: 94.7%
Branch coverage
94%
Covered branches: 17
Total branches: 18
Branch coverage: 94.4%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_Lock()100%11100%
get_TargetSide()100%11100%
get_CompletionSide()100%11100%
.ctor(...)100%11100%
get_Count()100%44100%
get_AdmissionGateBlock()100%11100%
get_IsCompletionRequested()100%210%
get_InputCompletion()100%11100%
CreateHooks()100%2293.75%
CreateExit(...)100%11100%
AdjustCount(...)100%22100%
get_BoundedCapacity()100%11100%
set_BoundedCapacity(...)90%101090.9%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/BoundedTargetBlock.cs

#LineLine coverage
 1using CounterpointCollective.Collections;
 2using CounterpointCollective.Dataflow.Encapsulation;
 3using CounterpointCollective.Dataflow.Notifying;
 4using System.Threading.Tasks.Dataflow;
 5
 6namespace CounterpointCollective.Dataflow
 7{
 8    /// <summary>
 9    /// A wrapper that enforces a bounded capacity on an arbitrary <see cref="ITargetBlock{T}"/>.
 10    ///
 11    /// Amount of messages currently owned by the block is tracked by <see cref="Count"/>.
 12    /// Incoming messages are added to <see cref="Count"/>.
 13    /// Exits can be created with <see cref="CreateExit{O}"/>. Messages that leave
 14    /// an exit are deduced from the <see cref="Count"/>.
 15    /// <see cref="Count"/> can also be adjusted manually with <see cref="AdjustCount(int)"/>,
 16    /// e.g. if you make messages re-enter the block.
 17    ///
 18    /// If <see cref="Count"/> hits <see cref="BoundedCapacity"/>, no new messages will be allowed
 19    /// to enter the block, until <see cref="Count"/> drops below <see cref="BoundedCapacity"/> again.
 20    /// Then, postponed messages will be processed in FIFO order.
 21    /// </summary>
 22    public class BoundedTargetBlock<T> : AbstractEncapsulatedTargetBlock<T>
 23    {
 9624        private object Lock { get; } = new();
 25        /// <exclude />
 12666426        protected override ITargetBlock<T> TargetSide => AdmissionGateBlock;
 27        /// <exclude />
 22871128        protected override IDataflowBlock CompletionSide => AdmissionGateBlock;
 29
 30        private volatile int reserved;
 31        private volatile int count;
 32        private volatile int boundedCapacity;
 33
 8534        private readonly List<Action> processPendingEventsFromAllExits = [];
 35
 36        public int Count
 37        {
 38            get
 39            {
 489240                if (Completion.IsCompleted)
 41                {
 942                    return 0;
 43                }
 44                else
 45                {
 46                    //To prevent dirty reads, we first need to ensure all events from all exits have processed.
 1929047                    foreach (var a in processPendingEventsFromAllExits)
 48                    {
 476249                        a.Invoke();
 50                    }
 488351                    return count;
 52                }
 53            }
 54        }
 55
 56        private readonly Action? _onEntering;
 57        private readonly Action? _onEntered;
 57172158        private AdmissionGateBlock<T> AdmissionGateBlock { get; }
 59
 60        /// <summary>
 61        /// Whether a completion request has been made.
 62        /// </summary>
 063        public bool IsCompletionRequested => AdmissionGateBlock.IsCompletionRequested;
 64
 65        /// <summary>
 66        /// Whether a completion request has been made and there are no more buffered messages waiting to enter.
 67        /// </summary>
 17768        public Task InputCompletion => AdmissionGateBlock.InputCompletion;
 69
 8570        public BoundedTargetBlock(
 8571            ITargetBlock<T> inner,
 8572            int boundedCapacity,
 8573            Action? onEntering = null,
 8574            Action? onEntered = null
 8575        )
 76        {
 8577            _onEntering = onEntering;
 8578            _onEntered = onEntered;
 8579            this.boundedCapacity = boundedCapacity;
 8580            var h = CreateHooks();
 8581            AdmissionGateBlock = new AdmissionGateBlock<T>(inner, h);
 8582        }
 83
 84        private AdmissionGateHooks CreateHooks()
 85        {
 9686            var res = new AdmissionGateHooks()
 9687            {
 9688                HasEntered = _onEntered
 9689            };
 90
 9691            if (BoundedCapacity == DataflowBlockOptions.Unbounded)
 92            {
 93                //When capacity is unbounded, we only need to track the count..
 6994                res.Entering = () =>
 6995                {
 225796                    _onEntering?.Invoke();
 225797                    Interlocked.Increment(ref count);
 232698                };
 99            }
 100            else
 101            {
 102                //When there is a bound, we need to check whether messages may enter as well.
 27103                res.MayTryToEnter =
 27104                () =>
 27105                {
 134497106                    var b = boundedCapacity;
 134497107                    while (b == DataflowBlockOptions.Unbounded || reserved + count < b)
 27108                    {
 124311109                        var r = Interlocked.Increment(ref reserved);
 27110
 27111                        //Double check in case of a concurrent update, e.g. via BoundedCapacity property setter or Adjus
 124311112                        b = boundedCapacity;
 124311113                        if (b != DataflowBlockOptions.Unbounded && r + count > b)
 27114                        {
 27115                            //We are overcommitted due to concurrency. Retry.
 0116                            Interlocked.Decrement(ref reserved);
 27117                        }
 27118                        else
 27119                        {
 124311120                            return true;
 27121                        }
 27122                    }
 10186123                    return false;
 27124                };
 27125                res.FailingToEnter =
 27126                () =>
 27127                {
 0128                    Interlocked.Decrement(ref reserved);
 0129                    AdmissionGateBlock.ProcessPostponedMessages();
 27130                };
 27131                res.Entering =
 27132                () =>
 27133                {
 124311134                    _onEntering?.Invoke();
 124311135                    Interlocked.Decrement(ref reserved);
 27136                    //It's ok that temporarily another thread may see a false HaveSpace here, because
 27137                    //hook methods will never be called concurrently.
 124311138                    Interlocked.Increment(ref count);
 124338139                };
 140            }
 141
 96142            return res;
 143        }
 144
 145        /// <returns>Messages that leave the returned block are subtracted from Count.</returns>
 146        public ISourceBlock<TO> CreateExit<TO>(ISourceBlock<TO> exitBlock)
 147        {
 84148            var ret = new NotifyingSourceBlock<TO>(exitBlock, h =>
 84149            {
 215901150                h.OnDeliveringMessages = count => AdjustCount(-count);
 168151                h.ConfigureDispatching = q => q.UseSynchronousDispatch();
 84152            }
 84153            );
 154
 84155            processPendingEventsFromAllExits.Add(ret.ProcessPendingEvents);
 84156            return ret;
 157        }
 158
 159        /// <summary>
 160        /// Use when you need to manually adjust the count of messages owned by the block,
 161        /// for instance when creating re-entry points.
 162        /// </summary>
 163        public void AdjustCount(int diff)
 164        {
 316480165            Interlocked.Add(ref count, diff);
 316480166            if (diff < 0)
 167            {
 216092168                AdmissionGateBlock.ProcessPostponedMessages();
 169            }
 316480170        }
 171
 172        /// <summary>
 173        /// You can dynamically adjust the bounded capacity of the block.
 174        /// While BoundedCapacity is less than or equal to Count, no new
 175        /// messages will be allowed.
 176        ///
 177        /// Set to DataflowBlockOptions.Unbounded to disable bounding.
 178        /// </summary>
 179        public int BoundedCapacity
 180        {
 5168181            get => boundedCapacity;
 182            set
 183            {
 132184                var oldValue = Interlocked.Exchange(ref boundedCapacity, value);
 132185                if (value != oldValue)
 186                {
 131187                    if (oldValue == DataflowBlockOptions.Unbounded || value == DataflowBlockOptions.Unbounded)
 188                    {
 11189                        lock (Lock)
 190                        {
 191
 11192                            AdmissionGateBlock.ConfigureHooks(CreateHooks());
 11193                            if (value == DataflowBlockOptions.Unbounded)
 194                            {
 0195                                reserved = 0; //Forget about any pending reserved messages, because the newly installed 
 196                            }
 11197                        }
 120198                    } else if (value > oldValue)
 199                    {
 66200                        AdmissionGateBlock.ProcessPostponedMessages();
 201                    }
 202                }
 132203            }
 204        }
 205    }
 206}