SubscribeAsync inside BackgroundService.ExecuteAsync fails to reach handler Action<TMessage>
I'm trying to test named pipes with MessagePipe package on my current project and some questions were arised while I was writing the code:
I have a BackgroundService class that listens from a pipe like below code:
public class EventBridgeService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
NamedPipeServerStream pipeServer = default;
try
{
cancellationToken.ThrowIfCancellationRequested();
while (!cancellationToken.IsCancellationRequested)
await using (pipeServer = new NamedPipeServerStream(this._namedPipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous))
{
// Configures the pipe server to wait for a connection while it blocks the execution loop.
await pipeServer.WaitForConnectionAsync(cancellationToken);
this._logger.LogInformation($"Client connected [{DateTime.Now.ToShortTimeString()}]");
(PipeTransmissionMode.Message)
StreamReader sr = new(pipeServer);
string streamContent = await sr.ReadToEndAsync(cancellationToken);
if (!(streamContent.Length > 0))
{
this._logger.LogWarning(
$"Client connection opened but none data was sent through the pipe.");
this._logger.LogInformation(
$"Disposing pipe.");
pipeServer?.Disconnect();
continue;
}
this._logger.LogInformation(
$"Success on trying to read data from pipe. Data read [{streamContent}]");
EventStreamData? data =
JsonConvert.DeserializeObject<EventStreamData>(streamContent);
await Task.Delay(10, cancellationToken);
this._logger.LogInformation($"Disposing pipe.");
pipeServer?.Disconnect();
}
}
finally
{
// Wait until the pipe is disposed to end the service execution.
await Task.Factory.StartNew(() => pipeServer?.DisposeAsync())
.ConfigureAwait(false);
}
// rest of service code implemementation
}
The client that writes to the named pipe actually do this through a .vbs script:
JSONData = "{some-json-string-data}"
Dim pipeClient
Set pipeClient = New NamedPipeClient.Init("some-pipe-name")
pipeClient.SendData JSONData
This code works flawlessly whenever I try to execute it, and every time the data is sent from vbs to the running background service, it's received as expected.
I tried to replace this logic to make use of MessagePipe.Interprocess NamedPipe:
Program.cs:
// somewhere inside the IHost build process the following is declared.
serviceCollection.AddMessagePipe()
.AddNamedPipeInterprocess(somePipeName, PipeServerConfigureAction);
// Note: PipeServerConfigureAction already declares `HostAsServer` as `true` like specified on docs.
BackgroundService.cs:
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
// Were we already cancelled before executing service logic
cancellationToken.ThrowIfCancellationRequested();
while (!cancellationToken.IsCancellationRequested)
{
// Considering this _subscriber is a IDistributedSubscriber<string,string> resolved from service constructor.
await this._subscriber.SubscribeAsync("", s =>
{
this._logger.LogInformation($"Client connected [{DateTime.Now.ToShortTimeString()}]");
if (!(s.Length > 0))
{
this._logger.LogWarning(
$"Client connection opened but none data was sent through the pipe.");
this._logger.LogInformation(
$"Disposing pipe.");
return;
}
this._logger.LogInformation(
$"Success on trying to read data from pipe. Data read [{s}]");
// do something with data read from pipe
this._logger.LogInformation(
$"Disposing pipe.");
}, cancellationToken);
await Task.Delay(10, cancellationToken);
}
}
But this doesn't work as expected. On first try nothing happens nor the action inside SubscribeAsync is executed and, then each following retry results in a exception thrown by the vbs itself saying that all pipe instances available are busy.
Even if I change the key parameter in SubscribeAsync to be exactly same as data sent from vbs:
BackgroundService.cs:
await this._subscriber.SubscribeAsync("someDataString", s => //...
VBScript file:
Data = "someDataString"
Dim pipeClient
Set pipeClient = New NamedPipeClient.Init("some-pipe-name")
pipeClient.SendData Data
The code does not reach the handler action as expected.
I guess maybe some mistake was commited because it's my firsy try on this package or, I may have also misunderstood how IDistributedSubscriber<,> and the other interfaces to use with named pipes works in comparison with how a NamedPipeServerStream does the job.
If anyone can help me to solve this problem I would be grateful.