BroadcastChannel
and ChannelMux
- [ ] ==TODO: ADD NUGET PACKAGE LINK==
Available on Nuget
BroadcastChannel
and ChannelMux
This library offers APIs similar to Channel<T>
for scenarios where multiple readers need to receive all data being sent by a single writer. For example, a service which writes to a channel and multiple readers that each perform some task, such as data processing, analysis, or streaming over the network to a client. Additionally, with ChannelMux
, multiple channels can be input to a single await
able output.
In addition to standalone, direct instantiation, BroadcastChannel
and ChannelMux
are designed to work well with Generic Host based Dependency Injection.
BroadcastChannel
Has a single input source/writer which writes to any number of readers, all of which are guaranteed to receive all written data.
This is in contrast to System.Threading.Channels.Channel
or a Queue type where each input is only ever read by a
single output.
ChannelMux
ChannelMux
is meant to aggregate multiple BroadcastChannel
into a single, awaitable object.
It is a generic type and each type parameter has a dedicated TryRead(out T data)
method.
ChannelMuxInput
acts as a writer to BroadcastChannelWriter
and each has a
SingleProducerSingleConsumerQueue
.
Note that each ChannelMuxInput
is a single input, single output where single means both a single instance writing
and a single instance reading, and thus can be optimized using SingleProducerSingleConsumerQueue
. ChannelMux
is not meant to be shared across threads.
flowchart LR
Producing_Object1("Producing Object") --> BroadcastChannelWriter1
Producing_Object2("Producing Object") --> BroadcastChannelWriter2
subgraph BroadcastChannel1 [BroadcastChannel]
BroadcastChannelWriter1["BroadcastChannelWriter"]
BroadcastChannelReader1_1["BroadcastChannelReader"]
BroadcastChannelReader1_2["BroadcastChannelReader"]
BroadcastChannelReader1_3["BroadcastChannelReader"]
end
BroadcastChannelReader1_1 --> Consuming_Object_BR1_1("Consuming Object")
BroadcastChannelReader1_2 --> Consuming_Object_BR1_2("Consuming Object")
BroadcastChannelReader1_3 --> Consuming_Object_BR1_3("Consuming Object")
subgraph BroadcastChannel2 [BroadcastChannel]
BroadcastChannelWriter2["BroadcastChannelWriter"]
BroadcastChannelWriter2 --> BroadcastChannelReader2_1["BroadcastChannelReader"]
BroadcastChannelWriter2 --> BroadcastChannelReader2_2["BroadcastChannelReader"]
BroadcastChannelWriter2 --> BroadcastChannelReader2_3["BroadcastChannelReader"]
end
BroadcastChannelReader2_1 --> Consuming_Object_BR2_1("Consuming Object")
BroadcastChannelReader2_2 --> Consuming_Object_BR2_2("Consuming Object")
BroadcastChannelReader2_3 --> Consuming_Object_BR2_3("Consuming Object")
subgraph ChannelMux
ChannelMuxT_1["ChannelMux<T1,T2,...>"]
ChannelMuxInput1["ChannelMuxInput"] --> ChannelMuxT_1
ChannelMuxInput2["ChannelMuxInput"] --> ChannelMuxT_1
end
BroadcastChannelWriter1 --> ChannelMuxInput1
BroadcastChannelWriter1 --> BroadcastChannelReader1_1
BroadcastChannelWriter1 --> BroadcastChannelReader1_2
BroadcastChannelWriter1 --> BroadcastChannelReader1_3
BroadcastChannelWriter2 --> ChannelMuxInput2
ChannelMuxT_1 --> Consuming_Object_1("Consuming Object")
Consuming_Object_BR1_1 ~~~ ChannelMux
Consuming_Object_BR2_1 ~~~ ChannelMux
BroadcastChannel2 ~~~ ChannelMux
Response Channels
BroadcastChannel<T>
is a subclass of BroadcastChannel<TData,TResponse>
where TResponse
is a default implementation of the generic IBroadcastChannelResponse
which provides the ability to pass an exception back to the writer (see Response Channels).
/// <summary>
/// Response message container
/// </summary>
public interface IBroadcastChannelResponse {
/// <summary>
/// Set to a <see cref="System.Exception"/> if one has occurred, else leave <c>null</c>
/// </summary>
public System.Exception? Exception { get; init; }
}
For more complex response types, use BroadcastChannel<TData,TResponse>
directly.
Notes on examples
In this documentation BroadcastChannel<T>
is used as a placeholder for both BroadcastChannel<T>
and BroadcastChannel<TData,TResponse>
, as the behavior is the identical between them. The same applies to BroadcastChannelWriter<T>
and BroadcastChannelReader<T>
.
Usage
BroadcastChannel
Much like Channel<T>
, create a BroadcastChannel
and use it to retrieve the writer as well as create new readers. BroadcastChannel<T>.Writer
returns the single BroadcastChannelWriter<T>
for the BroadcastChannel<T>
and any amount of calls to a BroadcastChannel<T>
's .Writer
property will always return the same instance.
using var broadcastChannel = new BroadcastChannel<ChannelMessage, ChannelResponse>();
using BroadcastChannel<ChannelMessage, ChannelResponse> writer1 = broadcastChannel.Writer;
using BroadcastChannel<ChannelMessage, ChannelResponse> writer2 = broadcastChannel.Writer;
Object.ReferenceEquals( writer1, writer2 ); // true
BroadcastChannelReader<T>
s are created with BroadcastChannel<T>.CreateReader<T>()
which will always return a new BroadcastChannelReader<T>
which will immediately begin receiving any data written by the associated BroadcastChannelWriter<T>
.
using var broadcastChannel = new BroadcastChannel<ChannelMessage, ChannelResponse>();
using IBroadcastChannelReader<ChannelMessage, ChannelResponse> reader1 = broadcastChannel.CreateReader();
using IBroadcastChannelReader<ChannelMessage, ChannelResponse> reader2 = broadcastChannel.CreateReader();
Object.ReferenceEquals( reader1, reader2 ); // false
Response Channels
The response channel on a BroadcastChannel
allows BroadcastChannel
to return arbitrary data to the writer, such as status, errors, etc.
Example
using var broadcastChannel = new BroadcastChannel<ChannelMessage, ChannelResponse>();
using IBroadcastChannelReader<ChannelMessage, ChannelResponse> reader = broadcastChannel.CreateReader();
using BroadcastChannel<ChannelMessage, ChannelResponse> writer = broadcastChannel.Writer;
await reader.WriteResponseAsync( new ChannelResponse( -1, taskName, new EmptyException( "Incomplete sequence" ) ), ct );
if( writer.TryReadResponse( out ChannelResponse? response ) ){
// work with response
}
ChannelMux
Construction
Create ChannelMux
from a set of BroadcastChannelWriter
s.
BroadcastChannel<DataTypeA> channel1 = new ();
BroadcastChannel<DataTypeB> channel2 = new ();
ChannelMux<DataTypeA, DataTypeB> mux = new (channel1.Writer, channel2.Writer);
Dependency Injection
ChannelMux
can be injected using a Dependency Injection container. The below examples use Microsoft.Extensions.DependencyInjection
.
var mux = _host.Services.GetRequiredService<ChannelMux<ChannelMessageSubA, ChannelMessageSubB, ChannelMessageSubC>>();
Or to gain access to the IBroadcastChannelReaderSource
var readerSourceA = _host.Services.GetRequiredService<IBroadcastChannelReaderSource<ChannelMessageSubA>>();
var readerSourceB = _host.Services.GetRequiredService<IBroadcastChannelReaderSource<ChannelMessageSubB>>();
var mux = new ChannelMux<ChannelMessageSubA, ChannelMessageSubB>( readerSourceA, readerSourceB );
An example of ChannelMux
used with constructor based Dependency Injection.
class ChannelMuxConsumer {
private ChannelMux<ChannelMessageSubA, ChannelMessageSubB, ChannelMessageSubC>> _mux;
public ChannelMuxConsumer ( ChannelMux<ChannelMessageSubA, ChannelMessageSubB, ChannelMessageSubC>> mux) => this._mux = mux;
public void DoWork(){
while ( await _mux.WaitToReadAsync( _cts.Token ) ) {
if ( _mux.TryRead( out ChannelMessageSubA? msgA ) ) {
// work with msgA
}
if ( _mux.TryRead( out ChannelMessageSubB? msgB ) ) {
// work with msgB
}
if ( _mux.TryRead( out ChannelMessageSubC? msgC ) ) {
// work with msgC
}
}
}
Read
Then loop and read
while ( await mux.WaitToReadAsync( _cts.Token ) ) {
if ( mux.TryRead( out ChannelMessageSubA? msgA ) ) {
msgA.Id.Should().Be( receivedCountA );
lastMsgA = msgA;
receivedCountA++;
}
if ( mux.TryRead( out ChannelMessageSubB? msgB ) ) {
msgB.Id.Should().Be( receivedCountB );
lastMsgB = msgB;
receivedCountB++;
}
}
Replacing Channels
Individual channels can be replaced on a live ChannelMux
. This is useful when the channels on a mux have different lifetimes. For example, most of the channels are long lived, but one channel is for smaller requests that return some data and then complete. By replacing a completed channel with a new one, the same mux can be used regardless of the lifetime of individual component channels.
using BroadcastChannel<DataTypeA> channel1 = new ();
using BroadcastChannel<DataTypeB> channel2 = new (); // this will be closed
using BroadcastChannel<DataTypeB> channelReplacement1 = new ();
using ChannelMux<DataTypeA, DataTypeB> mux = new (channel1.Writer, channel2.Writer);
// replacing an active channel will throw ChannelNotClosedException
mux.ReplaceChannel( channelReplacement1.Writer ); // throws
// to replace an active channel, use:
mux.ReplaceChannel( channelReplacement1.Writer, force: true );
// channelReplacement1 is now writing data into the mux
Dependency Injection Configuration
Use AddBroadcastChannels()
to automatically create a Broadcast Channel for any Data Type without any further configuration (with a non-specified (default IBroadcastChannelResponse
) response type):
Host.CreateDefaultBuilder( Array.Empty<string>() ).ConfigureServices(
services => {
services.AddBroadcastChannels();
} ).Build();
var channel = host.Services.GetRequiredService<IBroadcastChannel<ChannelMessageSubA, ChannelResponse>>();
For a specific Channel Data and Response type use AddBroadcastChannel<TData,TResponse>()
which must be run for each type pair desired:
Host.CreateDefaultBuilder( Array.Empty<string>() ).ConfigureServices(
services => {
services.AddBroadcastChannel<ChannelMessageSubA, ChannelResponse>();
services.AddBroadcastChannels();
} ).Build();
var channel = host.Services.GetRequiredService<IBroadcastChannel<ChannelMessageSubA, ChannelResponse>>();
When a specific Channel Data and Response type are added to the service collection, the default is mapped to it as well.
In the example below, both channelWithReponse
and defaultResponseTypeChannel
will be on the same channel (and receive the same data). Also note that AddBroadcastChannels()
and AddBroadcastChannel<TData,TResponse>()
can be used together.
Host.CreateDefaultBuilder( Array.Empty<string>() ).ConfigureServices(
services => {
services.AddBroadcastChannel<ChannelMessageSubA, ChannelResponse>();
services.AddBroadcastChannels();
} ).Build();
var channelWithReponse = host.Services.GetRequiredService<IBroadcastChannel<ChannelMessageSubA, ChannelResponse>>();
var defaultResponseTypeChannel = host.Services.GetRequiredService<IBroadcastChannel<ChannelMessageSubA>>();
Things to keep in mind
There is no (practical) limit to the amount of data that can be written to a channel. If a reader is created, but never read from, this could potentially consume a huge amount of memory. Under the hood, each
BroadcastChannelReader
uses anUnboundedChannel
. There is no reason an unbounded channel must be used. The ability to specify a limit and options for the action to perform when that limit if reached (such as withBoundedChannel
) could be a added in the future.Most, if not all types implement
IAsyncDisposable
orIDisposable
and as such, if they are directly instantiated, they must be disposed of. If the objects were acquired by Dependency Injection, the service container takes care of disposal and the programmer should not.Dispose()
manually.
Readers must exist before their received data
A BroadcastChannelReader
must be created before any data being written to it. This might seem obvious, but this isn't the case when using System.Threading.Channels
. System.Threading.Channels
has a single internal queue that all readers share where on each read, one value is removed from the queue whereas an independent data store is created for each new BroadcastChannelReader
.
There is no intermediate queue where data resides waiting for a non-existent reader to read from it. If there is no reader created, then any data written to the BroadcastChannelWriter
will simply be discarded. The reader can only receive data written after it's creation.
Example:
BroadcastChannel<int> channel = new ();
var writer = channel.Writer;
writer.TryWrite<int>( 1 ); // writer writes before reader is created
var reader = channel.GetReader(); // only at this point is the Reader's data queue allocated.
bool result = reader.TryRead( out int? data );
// result is false
// data is null
Data types in ChannelMux
must be unique
ChannelMux<T1,T2>
has sub types (ChannelMux<T1,T2>
, ChannelMux<T1,T2,T3>
, etc.) with 2+ type parameters and these type parameters identify the type of a specific channel's data. The data types are used to differentiate the TryRead<T>( out T )
methods. Unspecified mayhem may occur if the same type is passed in as a generic type argument for more than one type parameter.
Future
- Allow responses via
BroadcastChannelReaderSource
Additional Information
Contributions
Contributions and PRs are welcome.