< Summary

Information
Class: CounterpointCollective.Dataflow.DeferredBlock<T1, T2>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/DeferredBlock.cs
Line coverage
76%
Covered lines: 121
Uncovered lines: 37
Coverable lines: 158
Total lines: 322
Line coverage: 76.5%
Branch coverage
39%
Covered branches: 15
Total branches: 38
Branch coverage: 39.4%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_Lock()100%11100%
.ctor(...)58.33%121294.73%
get_Target()100%11100%
get_RealLink()100%11100%
.ctor(...)100%11100%
get_Completion()100%11100%
Complete()100%11100%
Fault(...)100%210%
OfferMessage(...)62.5%9876.92%
LinkTo(...)50%8664.7%
ConsumeMessage(...)0%2040%
ReleaseReservation(...)0%2040%
ReserveMessage(...)0%2040%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/DeferredBlock.cs

#LineLine coverage
 1using CounterpointCollective.Dataflow.Internal;
 2using CounterpointCollective.Utilities;
 3using System.Threading.Tasks.Dataflow;
 4
 5namespace CounterpointCollective.Dataflow
 6{
 7    /// <summary>
 8    /// A dataflow block that lets you construct and wire up an entire pipeline
 9    /// *before* the actual underlying block is available.
 10    ///
 11    /// <para>
 12    /// Use <b>DeferredBlock</b> when the real <see cref="IPropagatorBlock{I,O}"/>
 13    /// requires asynchronous setup before it can be created (e.g. initializing a
 14    /// database connection, loading configuration, performing I/O, warming caches,
 15    /// etc.).
 16    /// </para>
 17    ///
 18    /// <para>
 19    /// With this block you can:
 20    /// </para>
 21    /// <list type="bullet">
 22    ///   <item><description>
 23    ///     Set up all <see cref="LinkTo"/> connections immediately.
 24    ///     These links are recorded and only activated once the inner block is created.
 25    ///   </description></item>
 26    ///   <item><description>
 27    ///     Start building the rest of your pipeline without having to await the
 28    ///     initialization task.
 29    ///   </description></item>
 30    ///   <item><description>
 31    ///     Accept incoming messages during the initialization phase; they are stored
 32    ///     and offered to the inner block once it is ready.
 33    ///   </description></item>
 34    ///   <item><description>
 35    ///     Request completion or fault at any time; the request is applied either
 36    ///     immediately (if the inner block already exists) or propagated correctly
 37    ///     once initialization finishes.
 38    ///   </description></item>
 39    /// </list>
 40    ///
 41    /// <para>
 42    /// As soon as the asynchronous factory produces the real block:
 43    /// </para>
 44    /// <list type="bullet">
 45    ///   <item><description>
 46    ///     Pending links are created.
 47    ///   </description></item>
 48    ///   <item><description>
 49    ///     Postponed messages are forwarded.
 50    ///   </description></item>
 51    ///   <item><description>
 52    ///     Any previously requested completion or fault is propagated.
 53    ///   </description></item>
 54    /// </list>
 55    ///
 56    /// <para>
 57    /// After the inner block has been created, <b>DeferredBlock</b> behaves exactly as
 58    /// a transparent wrapper around it.
 59    /// </para>
 60    /// </summary>
 61    public class DeferredBlock<I, O> : IPropagatorBlock<I, O>
 62    {
 2663        private object Lock { get; } = new();
 64        private readonly TaskCompletionSource _completionRequest;
 65        private readonly PostponedMessages<DeferredBlock<I, O>, I> _postponedMessages;
 566        private readonly Dictionary<ISourceBlock<I>, I> _postponedValues = [];
 67
 1268        private record Link(ITargetBlock<O> Target, DataflowLinkOptions Options)
 69        {
 170            public IDisposable? RealLink { get; set; }
 71        }
 72
 573        private readonly List<Link> _pendingLinks = [];
 74
 75
 76        private volatile IPropagatorBlock<I, O>? inner;
 77
 78        public DeferredBlock(Task<IPropagatorBlock<I, O>> factory, CancellationToken cancellationToken = default)
 1079            : this(_ => factory, cancellationToken)
 80        {
 581        }
 82
 583        public DeferredBlock(Func<CancellationToken, Task<IPropagatorBlock<I, O>>> factory, CancellationToken cancellati
 84        {
 585            _postponedMessages = new(this);
 86
 587            var tcs = new TaskCompletionSource();
 588            Completion = tcs.Task;
 89
 90#pragma warning disable CA2000 // Dispose objects before losing scope
 591            var ctsCancelFactory = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 92#pragma warning restore CA2000 // Dispose objects before losing scope
 93
 594            Task.Run(async () =>
 595            {
 596                var factoryTask = await Task.WhenAny(factory(ctsCancelFactory.Token));
 597                if (factoryTask.IsFaulted)
 598                {
 199                    tcs.SetException(factoryTask.Exception);
 5100                }
 4101                else if (factoryTask.IsCanceled)
 5102                {
 2103                    if (cancellationToken.IsCancellationRequested)
 5104                    {
 5105                        //user requested cancellation
 1106                        tcs.SetCanceled();
 5107                    } else
 5108                    {
 5109                        //Completion requested before factory finished.
 1110                        tcs.SetResult();
 5111                    }
 5112                }
 5113                else //Factory done.
 5114                {
 2115                    var block = await factoryTask;
 2116                    lock (Lock)
 5117                    {
 5118                        //Create all pending links
 6119                        foreach (var l in _pendingLinks)
 5120                        {
 1121                            l.RealLink = block.LinkTo(l.Target, l.Options);
 5122                        }
 2123                        _pendingLinks.Clear();
 5124                        //Offer the postponed messages
 4125                        while (_postponedMessages.TryDequeuePostponed(out var p))
 5126                        {
 2127                            var res = block.OfferMessage(p.MessageHeader, _postponedValues[p.SourceBlock], p.SourceBlock
 2128                            if (res == DataflowMessageStatus.DecliningPermanently)
 5129                            {
 5130                                break; //Block already completed, stop offering.
 5131                            }
 5132                        }
 2133                        _postponedValues.Clear();
 2134                        inner = block;
 2135                    }
 2136                    await block.Completion.PropagateCompletionAsync(tcs);
 5137                }
 5138                ctsCancelFactory.Dispose();
 10139            }, CancellationToken.None);
 140
 5141            _completionRequest = new TaskCompletionSource();
 5142            _completionRequest.Task.ContinueWith(t =>
 5143            {
 2144                lock(Lock)
 5145                {
 2146                    if (inner == null)
 5147                    {
 5148                        //Completion requested before factory finished.
 5149                        //Try to stop the factory task
 0150                        ctsCancelFactory.Cancel();
 5151                        //Propagate completion to the targets of the links with PropagateCompletion.
 0152                        foreach (var l in _pendingLinks.Where(l => l.Options.PropagateCompletion))
 5153                        {
 0154                            _completionRequest.Task.PropagateCompletion(l.Target);
 5155                        }
 0156                        _pendingLinks.Clear();
 0157                        _postponedValues.Clear();
 5158                    } else
 5159                    {
 5160                        //Otherwise complete the block.
 2161                        _completionRequest.Task.PropagateCompletion(inner);
 5162                    }
 2163                }
 7164            }, CancellationToken.None);
 165
 5166            Completion.ContinueWith(t =>
 5167            {
 5168                lock (Lock)
 5169                {
 5170                    if (inner == null)
 5171                    {
 3172                        _postponedMessages.Clear();
 3173                        _postponedValues.Clear();
 5174                        //If we completed before the inner block was created, complete the targets of the links with Pro
 15175                        foreach (var l in _pendingLinks.Where(l => l.Options.PropagateCompletion))
 5176                        {
 3177                            t.PropagateCompletion(l.Target);
 5178                        }
 3179                        _pendingLinks.Clear();
 5180                    }
 5181                }
 10182            }, CancellationToken.None);
 5183        }
 184
 185        /// <exclude/>
 17186        public Task Completion { get; }
 187
 188        /// <exclude/>
 189        public void Complete()
 190        {
 2191            lock (Lock)
 192            {
 2193                _completionRequest.TrySetResult();
 2194            }
 2195        }
 196
 197        /// <exclude/>
 198        public void Fault(Exception exception)
 199        {
 0200            lock (Lock)
 201            {
 0202                _completionRequest.TrySetException(exception);
 0203            }
 0204        }
 205
 206        /// <exclude/>
 207        public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, I messageValue, ISourceBlock<I>? 
 208        {
 23209            if (inner == null)
 210            {
 5211                lock(Lock)
 212                {
 5213                    if (inner != null)
 214                    {
 0215                        return inner.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
 5216                    } else if (source == null)
 217                    {
 0218                        return DataflowMessageStatus.Declined;
 5219                    } else if (_completionRequest.Task.IsCompleted)
 220                    {
 0221                        return DataflowMessageStatus.DecliningPermanently;
 222                    } else
 223                    {
 5224                        _postponedMessages.Postpone(source, messageHeader);
 5225                        _postponedValues[source] = messageValue;
 5226                        return DataflowMessageStatus.Postponed;
 227                    }
 228                }
 229            } else
 230            {
 18231                return inner.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
 232            }
 5233        }
 234
 235        /// <exclude/>
 236        public IDisposable LinkTo(ITargetBlock<O> target, DataflowLinkOptions linkOptions)
 237        {
 5238            if (inner == null)
 239            {
 5240                lock(Lock)
 241                {
 5242                    if (inner == null)
 243                    {
 4244                        var link = new Link(target, linkOptions);
 4245                        _pendingLinks.Add(link);
 4246                        return new ActionDisposable(() =>
 4247                        {
 0248                            lock (Lock)
 4249                            {
 0250                                _pendingLinks.Remove(link);
 0251                                link.RealLink?.Dispose();
 0252                                link.RealLink = null;
 0253                            }
 4254                        });
 255                    }
 256                    else
 257                    {
 1258                        return inner.LinkTo(target, linkOptions);
 259                    }
 260                }
 261            } else
 262            {
 0263                return inner.LinkTo(target, linkOptions);
 264            }
 5265        }
 266
 267        /// <exclude/>
 268        public O? ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<O> target, out bool messageConsumed)
 269        {
 0270            if (inner == null)
 271            {
 0272                lock (Lock)
 273                {
 0274                    if (inner == null)
 275                    {
 0276                        messageConsumed = false;
 0277                        return default!;
 278                    }
 279                    else
 280                    {
 0281                        return inner.ConsumeMessage(messageHeader, target, out messageConsumed);
 282                    }
 283                }
 284            }
 285            else
 286            {
 0287                return inner.ConsumeMessage(messageHeader, target, out messageConsumed);
 288            }
 0289        }
 290
 291        /// <exclude/>
 292        public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<O> target)
 293        {
 0294            if (inner == null)
 295            {
 0296                lock (Lock)
 297                {
 0298                    inner?.ReleaseReservation(messageHeader, target);
 0299                }
 300            }
 301            else
 302            {
 0303                inner.ReleaseReservation(messageHeader, target);
 304            }
 0305        }
 306
 307        /// <exclude/>
 308        public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<O> target)
 309        {
 0310            if (inner == null)
 311            {
 0312                lock(Lock)
 313                {
 0314                    return inner?.ReserveMessage(messageHeader, target) ?? false;
 315                }
 316            } else
 317            {
 0318                return inner.ReserveMessage(messageHeader, target);
 319            }
 0320        }
 321    }
 322}