| | | 1 | | using System.Diagnostics.CodeAnalysis; |
| | | 2 | | using System.Threading.Tasks.Dataflow; |
| | | 3 | | |
| | | 4 | | namespace CounterpointCollective.Dataflow |
| | | 5 | | { |
| | | 6 | | /// <summary> |
| | | 7 | | /// A wrapper that enforces a <see cref="BoundedTargetBlock{I}.BoundedCapacity">BoundedCapacity</see> on an |
| | | 8 | | /// arbitrary <see cref="IPropagatorBlock{I,O}"/>. See also its base class <see cref="BoundedTargetBlock{I}"/>. |
| | | 9 | | /// </summary> |
| | | 10 | | |
| | | 11 | | public class BoundedPropagatorBlock<I,O> : BoundedTargetBlock<I>, IReceivableSourceBlock<O>, IResizablePropagatorBlo |
| | | 12 | | { |
| | 15682 | 13 | | private ISourceBlock<O> SourceSide { get; } |
| | | 14 | | public BoundedPropagatorBlock( |
| | | 15 | | IPropagatorBlock<I, O> inner, |
| | | 16 | | int boundedCapacity = DataflowBlockOptions.Unbounded, |
| | | 17 | | Action? onEntering = null, |
| | | 18 | | Action? onEntered = null |
| | 142 | 19 | | ) : base(inner, boundedCapacity, onEntering, onEntered) => SourceSide = CreateExit(inner); |
| | | 20 | | |
| | | 21 | | public BoundedPropagatorBlock( |
| | | 22 | | ITargetBlock<I> entrance, |
| | | 23 | | ISourceBlock<O> exit, |
| | | 24 | | int boundedCapacity = DataflowBlockOptions.Unbounded, |
| | | 25 | | Action? onEntering = null, |
| | | 26 | | Action? onEntered = null |
| | 37 | 27 | | ) : this(DataflowBlock.Encapsulate(entrance, exit), boundedCapacity, onEntering, onEntered) |
| | | 28 | | { |
| | 37 | 29 | | } |
| | | 30 | | |
| | | 31 | | /// <exclude /> |
| | | 32 | | public O? ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<O> target, out bool messageConsumed) |
| | 11 | 33 | | => SourceSide.ConsumeMessage(messageHeader, target, out messageConsumed); |
| | | 34 | | |
| | | 35 | | /// <exclude /> |
| | | 36 | | public IDisposable LinkTo(ITargetBlock<O> target, DataflowLinkOptions linkOptions) |
| | 348 | 37 | | => SourceSide.LinkTo(target, linkOptions); |
| | | 38 | | |
| | | 39 | | /// <exclude /> |
| | | 40 | | public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<O> target) |
| | 2 | 41 | | => SourceSide.ReleaseReservation(messageHeader, target); |
| | | 42 | | |
| | | 43 | | /// <exclude /> |
| | | 44 | | public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<O> target) |
| | 13 | 45 | | => SourceSide.ReserveMessage(messageHeader, target); |
| | | 46 | | |
| | | 47 | | /// <exclude /> |
| | | 48 | | public bool TryReceive(Predicate<O>? filter, [MaybeNullWhen(false)] out O item) |
| | | 49 | | { |
| | 14832 | 50 | | if (SourceSide is IReceivableSourceBlock<O> r && r.TryReceive(filter, out item)) |
| | | 51 | | { |
| | 14708 | 52 | | return true; |
| | | 53 | | } |
| | | 54 | | else |
| | | 55 | | { |
| | 124 | 56 | | item = default; |
| | 124 | 57 | | return false; |
| | | 58 | | } |
| | | 59 | | } |
| | | 60 | | |
| | | 61 | | /// <exclude /> |
| | | 62 | | public bool TryReceiveAll([NotNullWhen(true)] out IList<O>? items) { |
| | 476 | 63 | | if (SourceSide is IReceivableSourceBlock<O> r && r.TryReceiveAll(out items)) |
| | | 64 | | { |
| | 280 | 65 | | return true; |
| | | 66 | | } |
| | | 67 | | else |
| | | 68 | | { |
| | 196 | 69 | | items = null; |
| | 196 | 70 | | return false; |
| | | 71 | | } |
| | | 72 | | } |
| | | 73 | | } |
| | | 74 | | } |