| | | 1 | | using CounterpointCollective.Dataflow.Internal; |
| | | 2 | | using System.Diagnostics; |
| | | 3 | | using System.Threading.Tasks.Dataflow; |
| | | 4 | | |
| | | 5 | | namespace CounterpointCollective.Dataflow |
| | | 6 | | { |
| | | 7 | | /// <summary> |
| | | 8 | | /// Will always accept messages until it is explicitly told to complete or fault. |
| | | 9 | | /// </summary> |
| | | 10 | | public sealed class UnboundedTargetBlock<T> : ITargetBlock<T> |
| | | 11 | | { |
| | 1128195 | 12 | | private object Lock { get; } = new object(); |
| | | 13 | | |
| | | 14 | | private readonly ITargetBlock<T> _inner; |
| | | 15 | | private BufferBlock<T>? buffer; |
| | 113 | 16 | | private readonly TaskCompletionSource _tcsCompletionRequest = new(); |
| | 113 | 17 | | private readonly TaskCompletionSource _tcsInputCompletion = new(); |
| | | 18 | | |
| | 177 | 19 | | public Task InputCompletion => _tcsInputCompletion.Task; |
| | | 20 | | |
| | | 21 | | /// <exclude/> |
| | 189 | 22 | | public Task Completion => _inner.Completion; |
| | | 23 | | |
| | | 24 | | /// <exclude/> |
| | | 25 | | public void Complete() |
| | | 26 | | { |
| | 50 | 27 | | lock(Lock) |
| | | 28 | | { |
| | 50 | 29 | | _tcsCompletionRequest.TrySetResult(); |
| | 50 | 30 | | } |
| | 50 | 31 | | } |
| | | 32 | | |
| | | 33 | | /// <exclude/> |
| | | 34 | | public void Fault(Exception exception) |
| | | 35 | | { |
| | 26 | 36 | | lock (Lock) |
| | | 37 | | { |
| | 26 | 38 | | _tcsCompletionRequest.TrySetException(exception); |
| | 26 | 39 | | } |
| | 26 | 40 | | } |
| | | 41 | | |
| | 113 | 42 | | public UnboundedTargetBlock(ITargetBlock<T> inner) |
| | | 43 | | { |
| | 113 | 44 | | _inner = inner; |
| | | 45 | | |
| | 113 | 46 | | Task.Run(async () => |
| | 113 | 47 | | { |
| | 113 | 48 | | await Task.WhenAny(_tcsCompletionRequest.Task); |
| | 113 | 49 | | Task inputComplete; |
| | 113 | 50 | | //We need to lock to prevent propagating completion while OfferMessage may be concurrently installing a |
| | 76 | 51 | | lock (Lock) |
| | 113 | 52 | | { |
| | 76 | 53 | | _ = _tcsCompletionRequest.Task.PropagateCompletion(buffer == null ? _inner : buffer); |
| | 76 | 54 | | inputComplete = (buffer == null) ? _tcsCompletionRequest.Task : buffer.Completion; |
| | 76 | 55 | | } |
| | 76 | 56 | | await inputComplete.PropagateCompletionAsync(_tcsInputCompletion); |
| | 188 | 57 | | }); |
| | 113 | 58 | | } |
| | | 59 | | |
| | | 60 | | /// <exclude/> |
| | | 61 | | public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T>? |
| | | 62 | | { |
| | 1127930 | 63 | | lock (Lock) |
| | | 64 | | { |
| | 1127930 | 65 | | var b = buffer; |
| | 1127930 | 66 | | if (b == null) |
| | | 67 | | { |
| | 1117816 | 68 | | if (!consumeToAccept && _inner.OfferMessage(messageHeader, messageValue, null, false) == DataflowMes |
| | | 69 | | { |
| | 1117811 | 70 | | return DataflowMessageStatus.Accepted; |
| | | 71 | | } |
| | | 72 | | //Optimistic path failed. Ensure we have an always-accepting buffer installed. |
| | 5 | 73 | | b = new BufferBlock<T>(); |
| | 5 | 74 | | b.LinkTo(_inner, new DataflowLinkOptions() { PropagateCompletion = true }); |
| | | 75 | | } |
| | 10119 | 76 | | var ret = ((ITargetBlock<T>)b!).OfferMessage(messageHeader, messageValue, source, consumeToAccept); |
| | | 77 | | Debug.Assert(ret == DataflowMessageStatus.Accepted || _tcsCompletionRequest.Task.IsCompleted); |
| | | 78 | | |
| | | 79 | | //To prevent out of order posting from the non-locking path, we can only install the buffer after succes |
| | 10119 | 80 | | buffer ??= b; |
| | 10119 | 81 | | return ret; |
| | | 82 | | } |
| | 1127930 | 83 | | } |
| | | 84 | | } |
| | | 85 | | } |