< Summary

Information
Class: CounterpointCollective.Dataflow.UnboundedTargetBlock<T>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/UnboundedTargetBlock.cs
Line coverage
100%
Covered lines: 39
Uncovered lines: 0
Coverable lines: 39
Total lines: 85
Line coverage: 100%
Branch coverage
100%
Covered branches: 8
Total branches: 8
Branch coverage: 100%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_Lock()100%11100%
.ctor(...)100%11100%
get_InputCompletion()100%11100%
get_Completion()100%11100%
Complete()100%11100%
Fault(...)100%11100%
OfferMessage(...)100%88100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/UnboundedTargetBlock.cs

#LineLine coverage
 1using CounterpointCollective.Dataflow.Internal;
 2using System.Diagnostics;
 3using System.Threading.Tasks.Dataflow;
 4
 5namespace CounterpointCollective.Dataflow
 6{
 7    /// <summary>
 8    /// Will always accept messages until it is explicitly told to complete or fault.
 9    /// </summary>
 10    public sealed class UnboundedTargetBlock<T> : ITargetBlock<T>
 11    {
 112819512        private object Lock { get; } = new object();
 13
 14        private readonly ITargetBlock<T> _inner;
 15        private BufferBlock<T>? buffer;
 11316        private readonly TaskCompletionSource _tcsCompletionRequest = new();
 11317        private readonly TaskCompletionSource _tcsInputCompletion = new();
 18
 17719        public Task InputCompletion => _tcsInputCompletion.Task;
 20
 21        /// <exclude/>
 18922        public Task Completion => _inner.Completion;
 23
 24        /// <exclude/>
 25        public void Complete()
 26        {
 5027            lock(Lock)
 28            {
 5029                _tcsCompletionRequest.TrySetResult();
 5030            }
 5031        }
 32
 33        /// <exclude/>
 34        public void Fault(Exception exception)
 35        {
 2636            lock (Lock)
 37            {
 2638                _tcsCompletionRequest.TrySetException(exception);
 2639            }
 2640        }
 41
 11342        public UnboundedTargetBlock(ITargetBlock<T> inner)
 43        {
 11344            _inner = inner;
 45
 11346            Task.Run(async () =>
 11347            {
 11348                await Task.WhenAny(_tcsCompletionRequest.Task);
 11349                Task inputComplete;
 11350                //We need to lock to prevent propagating completion while OfferMessage may be concurrently installing a 
 7651                lock (Lock)
 11352                {
 7653                    _ = _tcsCompletionRequest.Task.PropagateCompletion(buffer == null ? _inner : buffer);
 7654                    inputComplete = (buffer == null) ? _tcsCompletionRequest.Task : buffer.Completion;
 7655                }
 7656                await inputComplete.PropagateCompletionAsync(_tcsInputCompletion);
 18857            });
 11358        }
 59
 60        /// <exclude/>
 61        public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T>? 
 62        {
 112793063            lock (Lock)
 64            {
 112793065                var b = buffer;
 112793066                if (b == null)
 67                {
 111781668                    if (!consumeToAccept && _inner.OfferMessage(messageHeader, messageValue, null, false) == DataflowMes
 69                    {
 111781170                        return DataflowMessageStatus.Accepted;
 71                    }
 72                    //Optimistic path failed. Ensure we have an always-accepting buffer installed.
 573                    b = new BufferBlock<T>();
 574                    b.LinkTo(_inner, new DataflowLinkOptions() { PropagateCompletion = true });
 75                }
 1011976                var ret = ((ITargetBlock<T>)b!).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
 77                Debug.Assert(ret == DataflowMessageStatus.Accepted || _tcsCompletionRequest.Task.IsCompleted);
 78
 79                //To prevent out of order posting from the non-locking path, we can only install the buffer after succes
 1011980                buffer ??= b;
 1011981                return ret;
 82            }
 112793083        }
 84    }
 85}