| | | 1 | | using System; |
| | | 2 | | using System.Collections.Generic; |
| | | 3 | | using System.Diagnostics.CodeAnalysis; |
| | | 4 | | using System.Runtime.CompilerServices; |
| | | 5 | | using System.Threading.Tasks; |
| | | 6 | | using System.Threading.Tasks.Dataflow; |
| | | 7 | | |
| | | 8 | | namespace CounterpointCollective.Dataflow |
| | | 9 | | { |
| | | 10 | | /// <summary> |
| | | 11 | | /// Important: transform must be idempotent and cheap! It will be run multiple times for the same input in some scen |
| | | 12 | | /// </summary> |
| | | 13 | | public sealed class SynchronousTransformingBlock<I, O>: IReceivableSourceBlock<O> |
| | | 14 | | { |
| | | 15 | | private readonly ISourceBlock<I> _sourceBlock; |
| | 104600 | 16 | | private Func<I, O> Transform { get; } |
| | | 17 | | private readonly DummyTarget _target; |
| | 309533 | 18 | | private record LastMessage(I Message, O TransformedMessage); |
| | 4248 | 19 | | private object OutgoingLock { get; } = new(); |
| | | 20 | | private ITargetBlock<O>? reservationTarget; |
| | | 21 | | |
| | 5 | 22 | | private static readonly IEqualityComparer<I> Comparer = typeof(I).IsValueType ? EqualityComparer<I>.Default : ne |
| | | 23 | | |
| | | 24 | | private class ReferenceComparer : IEqualityComparer<I> |
| | | 25 | | { |
| | 0 | 26 | | public bool Equals(I? x, I? y) => ReferenceEquals(x, y); |
| | 0 | 27 | | public int GetHashCode([DisallowNull] I obj) => RuntimeHelpers.GetHashCode(obj); |
| | | 28 | | } |
| | | 29 | | |
| | 104630 | 30 | | private object Lock { get; } = new(); |
| | | 31 | | private LastMessage? lastMessage; |
| | | 32 | | |
| | 30 | 33 | | public SynchronousTransformingBlock(ISourceBlock<I> sourceBlock, Func<I, O> transform) |
| | | 34 | | { |
| | 30 | 35 | | _sourceBlock = sourceBlock; |
| | 30 | 36 | | _target = new(this); |
| | 30 | 37 | | Transform = message => |
| | 30 | 38 | | { |
| | 104600 | 39 | | lock (Lock) |
| | 30 | 40 | | { |
| | 104600 | 41 | | if (lastMessage == null || !Comparer.Equals(message, lastMessage.Message)) |
| | 30 | 42 | | { |
| | 100351 | 43 | | lastMessage = new LastMessage(message, transform(message)); |
| | 30 | 44 | | } |
| | 30 | 45 | | else |
| | 30 | 46 | | { |
| | 30 | 47 | | //Serve the cached transformed value. |
| | 30 | 48 | | } |
| | 104600 | 49 | | return lastMessage.TransformedMessage; |
| | 30 | 50 | | } |
| | 104630 | 51 | | }; |
| | 30 | 52 | | } |
| | | 53 | | |
| | | 54 | | public IDisposable LinkTo(ITargetBlock<O> target, DataflowLinkOptions linkOptions) |
| | | 55 | | { |
| | 31 | 56 | | var t = new LinkTarget(this, target); |
| | 31 | 57 | | return _sourceBlock.LinkTo(t, linkOptions); |
| | | 58 | | } |
| | | 59 | | |
| | 6 | 60 | | public Task Completion => _sourceBlock.Completion; |
| | | 61 | | |
| | 0 | 62 | | public void Complete() => _sourceBlock.Complete(); |
| | | 63 | | |
| | 0 | 64 | | public void Fault(Exception exception) => _sourceBlock.Fault(exception); |
| | | 65 | | |
| | | 66 | | public O? ConsumeMessage( |
| | | 67 | | DataflowMessageHeader messageHeader, |
| | | 68 | | ITargetBlock<O> target, |
| | | 69 | | out bool messageConsumed |
| | | 70 | | ) |
| | | 71 | | { |
| | 4198 | 72 | | messageConsumed = false; |
| | 4198 | 73 | | I? ret = default; |
| | 4198 | 74 | | lock (OutgoingLock) |
| | | 75 | | { |
| | 4198 | 76 | | if (reservationTarget == null || reservationTarget == target) |
| | | 77 | | { |
| | 4198 | 78 | | ret = _sourceBlock.ConsumeMessage(messageHeader, _target, out messageConsumed); |
| | 4198 | 79 | | reservationTarget = null; |
| | | 80 | | } |
| | 4198 | 81 | | } |
| | 4198 | 82 | | return messageConsumed ? Transform(ret!) : default; |
| | | 83 | | } |
| | | 84 | | |
| | | 85 | | public void ReleaseReservation( |
| | | 86 | | DataflowMessageHeader messageHeader, |
| | | 87 | | ITargetBlock<O> target |
| | | 88 | | ) |
| | | 89 | | { |
| | 0 | 90 | | lock(OutgoingLock) |
| | | 91 | | { |
| | 0 | 92 | | if (reservationTarget == target) |
| | | 93 | | { |
| | 0 | 94 | | _sourceBlock.ReleaseReservation(messageHeader, _target); |
| | 0 | 95 | | reservationTarget = null; |
| | | 96 | | } |
| | 0 | 97 | | } |
| | 0 | 98 | | } |
| | | 99 | | |
| | | 100 | | public bool ReserveMessage( |
| | | 101 | | DataflowMessageHeader messageHeader, |
| | | 102 | | ITargetBlock<O> target |
| | | 103 | | ) |
| | | 104 | | { |
| | 20 | 105 | | lock (OutgoingLock) |
| | | 106 | | { |
| | 20 | 107 | | if (reservationTarget == null || reservationTarget == target) |
| | | 108 | | { |
| | 20 | 109 | | var res = _sourceBlock.ReserveMessage(messageHeader, _target); |
| | 20 | 110 | | if (res) |
| | | 111 | | { |
| | 20 | 112 | | reservationTarget = target; |
| | | 113 | | } |
| | 20 | 114 | | return res; |
| | | 115 | | } else |
| | | 116 | | { |
| | 0 | 117 | | return false; //other target already holds a reservation. |
| | | 118 | | } |
| | | 119 | | } |
| | 20 | 120 | | } |
| | | 121 | | |
| | | 122 | | public bool TryReceive(Predicate<O>? filter, [MaybeNullWhen(false)] out O item) |
| | | 123 | | { |
| | 0 | 124 | | var filter2 = filter == null ? null : new Predicate<I>(i => filter(Transform(i))); |
| | | 125 | | |
| | 0 | 126 | | if (_sourceBlock is IReceivableSourceBlock<I> receivableSource && |
| | 0 | 127 | | receivableSource.TryReceive(filter2, out var v) |
| | 0 | 128 | | ) |
| | | 129 | | { |
| | 0 | 130 | | item = Transform(v!); |
| | 0 | 131 | | return true; |
| | | 132 | | } |
| | | 133 | | else |
| | | 134 | | { |
| | 0 | 135 | | item = default; |
| | 0 | 136 | | return false; |
| | | 137 | | } |
| | | 138 | | } |
| | | 139 | | |
| | | 140 | | public bool TryReceiveAll([NotNullWhen(true)] out IList<O>? items) |
| | | 141 | | { |
| | 2 | 142 | | if (_sourceBlock is IReceivableSourceBlock<I> receivableSource && |
| | 2 | 143 | | receivableSource.TryReceiveAll(out var vs) |
| | 2 | 144 | | ) |
| | | 145 | | { |
| | 1 | 146 | | var ret = new List<O>(vs.Count); |
| | 22 | 147 | | foreach (var v in vs) |
| | | 148 | | { |
| | 10 | 149 | | ret.Add(Transform(v!)); |
| | | 150 | | } |
| | 1 | 151 | | items = ret; |
| | 1 | 152 | | return true; |
| | | 153 | | } |
| | | 154 | | else |
| | | 155 | | { |
| | 1 | 156 | | items = default; |
| | 1 | 157 | | return false; |
| | | 158 | | } |
| | | 159 | | } |
| | | 160 | | |
| | 31 | 161 | | private sealed class LinkTarget(SynchronousTransformingBlock<I,O> outer, ITargetBlock<O> target) : ITargetBlock< |
| | | 162 | | { |
| | 0 | 163 | | public Task Completion => target.Completion; |
| | | 164 | | |
| | 18 | 165 | | public void Complete() => target.Complete(); |
| | 8 | 166 | | public void Fault(Exception exception) => target.Fault(exception); |
| | | 167 | | public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, I messageValue, ISourceBlock< |
| | 100392 | 168 | | => target.OfferMessage(messageHeader, outer.Transform(messageValue), outer, consumeToAccept); |
| | | 169 | | } |
| | | 170 | | |
| | 30 | 171 | | private sealed class DummyTarget(SynchronousTransformingBlock<I, O> outer) : ITargetBlock<I> |
| | | 172 | | { |
| | 0 | 173 | | public Task Completion => outer.Completion; |
| | 0 | 174 | | public void Complete() => outer.Complete(); |
| | 0 | 175 | | public void Fault(Exception exception) => outer.Fault(exception); |
| | | 176 | | public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, I messageValue, ISourceBlock< |
| | 0 | 177 | | => DataflowMessageStatus.Postponed; |
| | | 178 | | } |
| | | 179 | | } |
| | | 180 | | } |