< Summary

Information
Class: CounterpointCollective.Dataflow.SynchronousTransformingBlock<T1, T2>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/SynchronousTransformingBlock.cs
Line coverage
70%
Covered lines: 57
Uncovered lines: 24
Coverable lines: 81
Total lines: 180
Line coverage: 70.3%
Branch coverage
62%
Covered branches: 20
Total branches: 32
Branch coverage: 62.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_Transform()100%11100%
get_Message()100%11100%
get_OutgoingLock()100%11100%
.cctor()50%22100%
Equals(...)100%210%
GetHashCode(...)100%210%
get_Lock()100%11100%
.ctor(...)100%44100%
LinkTo(...)100%11100%
get_Completion()100%11100%
Complete()100%210%
Fault(...)100%210%
ConsumeMessage(...)83.33%66100%
ReleaseReservation(...)0%620%
ReserveMessage(...)66.66%6687.5%
TryReceive(...)0%4260%
TryReceiveAll(...)100%66100%
.ctor(...)100%11100%
get_Completion()100%210%
Complete()100%11100%
Fault(...)100%11100%
OfferMessage(...)100%11100%
.ctor(...)100%11100%
get_Completion()100%210%
Complete()100%210%
Fault(...)100%210%
OfferMessage(...)100%210%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/SynchronousTransformingBlock.cs

#LineLine coverage
 1using System;
 2using System.Collections.Generic;
 3using System.Diagnostics.CodeAnalysis;
 4using System.Runtime.CompilerServices;
 5using System.Threading.Tasks;
 6using System.Threading.Tasks.Dataflow;
 7
 8namespace CounterpointCollective.Dataflow
 9{
 10    /// <summary>
 11    /// Important: transform must be idempotent and cheap! It will be run multiple times for the same input in some scen
 12    /// </summary>
 13    public sealed class SynchronousTransformingBlock<I, O>: IReceivableSourceBlock<O>
 14    {
 15        private readonly ISourceBlock<I> _sourceBlock;
 10460016        private Func<I, O> Transform { get; }
 17        private readonly DummyTarget _target;
 30953318        private record LastMessage(I Message, O TransformedMessage);
 424819        private object OutgoingLock { get; } = new();
 20        private ITargetBlock<O>? reservationTarget;
 21
 522        private static readonly IEqualityComparer<I> Comparer = typeof(I).IsValueType ? EqualityComparer<I>.Default : ne
 23
 24        private class ReferenceComparer : IEqualityComparer<I>
 25        {
 026            public bool Equals(I? x, I? y) => ReferenceEquals(x, y);
 027            public int GetHashCode([DisallowNull] I obj) => RuntimeHelpers.GetHashCode(obj);
 28        }
 29
 10463030        private object Lock { get; } = new();
 31        private LastMessage? lastMessage;
 32
 3033        public SynchronousTransformingBlock(ISourceBlock<I> sourceBlock, Func<I, O> transform)
 34        {
 3035            _sourceBlock = sourceBlock;
 3036            _target = new(this);
 3037            Transform = message =>
 3038            {
 10460039                lock (Lock)
 3040                {
 10460041                    if (lastMessage == null || !Comparer.Equals(message, lastMessage.Message))
 3042                    {
 10035143                        lastMessage = new LastMessage(message, transform(message));
 3044                    }
 3045                    else
 3046                    {
 3047                        //Serve the cached transformed value.
 3048                    }
 10460049                    return lastMessage.TransformedMessage;
 3050                }
 10463051            };
 3052        }
 53
 54        public IDisposable LinkTo(ITargetBlock<O> target, DataflowLinkOptions linkOptions)
 55        {
 3156            var t = new LinkTarget(this, target);
 3157            return _sourceBlock.LinkTo(t, linkOptions);
 58        }
 59
 660        public Task Completion => _sourceBlock.Completion;
 61
 062        public void Complete() => _sourceBlock.Complete();
 63
 064        public void Fault(Exception exception) => _sourceBlock.Fault(exception);
 65
 66        public O? ConsumeMessage(
 67            DataflowMessageHeader messageHeader,
 68            ITargetBlock<O> target,
 69            out bool messageConsumed
 70        )
 71        {
 419872            messageConsumed = false;
 419873            I? ret = default;
 419874            lock (OutgoingLock)
 75            {
 419876                if (reservationTarget == null || reservationTarget == target)
 77                {
 419878                    ret = _sourceBlock.ConsumeMessage(messageHeader, _target, out messageConsumed);
 419879                    reservationTarget = null;
 80                }
 419881            }
 419882            return messageConsumed ? Transform(ret!) : default;
 83        }
 84
 85        public void ReleaseReservation(
 86            DataflowMessageHeader messageHeader,
 87            ITargetBlock<O> target
 88        )
 89        {
 090            lock(OutgoingLock)
 91            {
 092                if (reservationTarget == target)
 93                {
 094                    _sourceBlock.ReleaseReservation(messageHeader, _target);
 095                    reservationTarget = null;
 96                }
 097            }
 098        }
 99
 100        public bool ReserveMessage(
 101            DataflowMessageHeader messageHeader,
 102            ITargetBlock<O> target
 103        )
 104        {
 20105            lock (OutgoingLock)
 106            {
 20107                if (reservationTarget == null || reservationTarget == target)
 108                {
 20109                    var res = _sourceBlock.ReserveMessage(messageHeader, _target);
 20110                    if (res)
 111                    {
 20112                        reservationTarget = target;
 113                    }
 20114                    return res;
 115                } else
 116                {
 0117                    return false; //other target already holds a reservation.
 118                }
 119            }
 20120        }
 121
 122        public bool TryReceive(Predicate<O>? filter, [MaybeNullWhen(false)] out O item)
 123        {
 0124            var filter2 = filter == null ? null : new Predicate<I>(i => filter(Transform(i)));
 125
 0126            if (_sourceBlock is IReceivableSourceBlock<I> receivableSource &&
 0127                receivableSource.TryReceive(filter2, out var v)
 0128            )
 129            {
 0130                item = Transform(v!);
 0131                return true;
 132            }
 133            else
 134            {
 0135                item = default;
 0136                return false;
 137            }
 138        }
 139
 140        public bool TryReceiveAll([NotNullWhen(true)] out IList<O>? items)
 141        {
 2142            if (_sourceBlock is IReceivableSourceBlock<I> receivableSource &&
 2143                receivableSource.TryReceiveAll(out var vs)
 2144            )
 145            {
 1146                var ret = new List<O>(vs.Count);
 22147                foreach (var v in vs)
 148                {
 10149                    ret.Add(Transform(v!));
 150                }
 1151                items = ret;
 1152                return true;
 153            }
 154            else
 155            {
 1156                items = default;
 1157                return false;
 158            }
 159        }
 160
 31161        private sealed class LinkTarget(SynchronousTransformingBlock<I,O> outer, ITargetBlock<O> target) : ITargetBlock<
 162        {
 0163            public Task Completion => target.Completion;
 164
 18165            public void Complete() => target.Complete();
 8166            public void Fault(Exception exception) => target.Fault(exception);
 167            public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, I messageValue, ISourceBlock<
 100392168                => target.OfferMessage(messageHeader, outer.Transform(messageValue), outer, consumeToAccept);
 169        }
 170
 30171        private sealed class DummyTarget(SynchronousTransformingBlock<I, O> outer) : ITargetBlock<I>
 172        {
 0173            public Task Completion => outer.Completion;
 0174            public void Complete() => outer.Complete();
 0175            public void Fault(Exception exception) => outer.Fault(exception);
 176            public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, I messageValue, ISourceBlock<
 0177                => DataflowMessageStatus.Postponed;
 178        }
 179    }
 180}