| | | 1 | | using CounterpointCollective.Dataflow.Fluent; |
| | | 2 | | using CounterpointCollective.Utilities; |
| | | 3 | | using System.Threading.Tasks.Dataflow; |
| | | 4 | | |
| | | 5 | | namespace CounterpointCollective.Dataflow |
| | | 6 | | { |
| | | 7 | | public static class ParallelDataflowBlockExtensions |
| | | 8 | | { |
| | | 9 | | public static IPropagatorBlock<TInput, TOutput> Par<TInput, T1, T2, TOutput>( |
| | | 10 | | this IPropagatorBlock<TInput, T1> p1, |
| | | 11 | | IPropagatorBlock<TInput, T2> p2, |
| | | 12 | | Func<T1, T2, TOutput> f, |
| | | 13 | | GuaranteedBroadcastBlockOptions options |
| | | 14 | | ) |
| | | 15 | | { |
| | 1 | 16 | | var ret = new ParallelBlock<TInput, Either<T1, T2>, TOutput>(2, Recombine, options); |
| | | 17 | | |
| | | 18 | | TOutput Recombine(Either<T1, T2>[] inputs) |
| | | 19 | | { |
| | 10 | 20 | | var idx = inputs[0].IsLeft ? 0 : 1; |
| | 10 | 21 | | var l = inputs[idx].FromLeft; |
| | 10 | 22 | | var r = inputs[(idx + 1) % 2].FromRight; |
| | 10 | 23 | | return f(l, r); |
| | | 24 | | } |
| | | 25 | | |
| | 1 | 26 | | var pt1 = |
| | 1 | 27 | | p1.Pipeline() |
| | 10 | 28 | | .SynchronousTransform(e => Either<T1, T2>.Left(e)) |
| | 1 | 29 | | .Build(); |
| | | 30 | | |
| | 1 | 31 | | var pt2 = |
| | 1 | 32 | | p2.Pipeline() |
| | 10 | 33 | | .SynchronousTransform(e => Either<T1, T2>.Right(e)) |
| | 1 | 34 | | .Build(); |
| | | 35 | | |
| | 1 | 36 | | ret.Hookup(pt1, new() { PropagateCompletion = true }); |
| | 1 | 37 | | ret.Hookup(pt2, new() { PropagateCompletion = true }); |
| | | 38 | | |
| | 1 | 39 | | return ret; |
| | | 40 | | } |
| | | 41 | | |
| | | 42 | | public static IPropagatorBlock<I, I> Par<I>(this IPropagatorBlock<I, I>[] blocks) => |
| | 2 | 43 | | blocks.Par(new()); |
| | | 44 | | |
| | | 45 | | public static IPropagatorBlock<I, I> Par<I>( |
| | | 46 | | this IPropagatorBlock<I, I>[] blocks, |
| | | 47 | | GuaranteedBroadcastBlockOptions options |
| | 3 | 48 | | ) => Par(blocks, RecombineByVerifyingLockstepOutput, options); |
| | | 49 | | |
| | | 50 | | public static ParallelBlock<I, T, O> Par<I, T, O>( |
| | | 51 | | this IPropagatorBlock<I, T>[] blocks, |
| | | 52 | | Func<T[], O> recombine, |
| | | 53 | | GuaranteedBroadcastBlockOptions options |
| | | 54 | | ) |
| | | 55 | | { |
| | 3 | 56 | | var res = new ParallelBlock<I, T, O>( |
| | 3 | 57 | | blocks.Length, |
| | 3 | 58 | | recombine, |
| | 3 | 59 | | options |
| | 3 | 60 | | ); |
| | 3 | 61 | | res.LinkWorkers(blocks, new() { PropagateCompletion = true }); |
| | 3 | 62 | | return res; |
| | | 63 | | } |
| | | 64 | | |
| | | 65 | | private static I RecombineByVerifyingLockstepOutput<I>(I[] outputs) |
| | | 66 | | { |
| | 12 | 67 | | var first = outputs[0]; |
| | 80 | 68 | | for (var i = 1; i < outputs.Length; i++) |
| | | 69 | | { |
| | 28 | 70 | | if (!EqualityComparer<I>.Default.Equals(first, outputs[i])) |
| | | 71 | | { |
| | 0 | 72 | | throw new ArgumentException( |
| | 0 | 73 | | "Not all outputs were the same. The order of the message probably changed." |
| | 0 | 74 | | ); |
| | | 75 | | } |
| | | 76 | | } |
| | 12 | 77 | | return first; |
| | | 78 | | } |
| | | 79 | | } |
| | | 80 | | } |