MessagePipe icon indicating copy to clipboard operation
MessagePipe copied to clipboard

SubscribeAsync inside BackgroundService.ExecuteAsync fails to reach handler Action<TMessage>

Open chrisbewz opened this issue 1 year ago • 0 comments

I'm trying to test named pipes with MessagePipe package on my current project and some questions were arised while I was writing the code:

I have a BackgroundService class that listens from a pipe like below code:


public class EventBridgeService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        NamedPipeServerStream pipeServer = default;

        try
        {
            cancellationToken.ThrowIfCancellationRequested();

            while (!cancellationToken.IsCancellationRequested)
                await using (pipeServer = new NamedPipeServerStream(this._namedPipeName,
                                 PipeDirection.InOut,
                                 NamedPipeServerStream.MaxAllowedServerInstances,
                                 PipeTransmissionMode.Message,
                                 PipeOptions.Asynchronous))
                {
                    // Configures the pipe server to wait for a connection while it blocks the execution loop.
                    await pipeServer.WaitForConnectionAsync(cancellationToken);

                    this._logger.LogInformation($"Client connected [{DateTime.Now.ToShortTimeString()}]");

(PipeTransmissionMode.Message)
                    StreamReader sr = new(pipeServer);

                    string streamContent = await sr.ReadToEndAsync(cancellationToken);

                    if (!(streamContent.Length > 0))
                    {
                        this._logger.LogWarning(
                            $"Client connection opened but none data was sent through the pipe.");

                        this._logger.LogInformation(
                            $"Disposing pipe.");

                        pipeServer?.Disconnect();
                        continue;
                    }

                    this._logger.LogInformation(
                        $"Success on trying to read data from pipe. Data read [{streamContent}]");

                    EventStreamData? data =
                        JsonConvert.DeserializeObject<EventStreamData>(streamContent);

                    await Task.Delay(10, cancellationToken);

                    this._logger.LogInformation($"Disposing pipe.");

                    pipeServer?.Disconnect();
                }
        }
        finally
        {
            // Wait until the pipe is disposed to end the service execution.
            await Task.Factory.StartNew(() => pipeServer?.DisposeAsync())
                      .ConfigureAwait(false);
        }
        // rest of service code implemementation
}

The client that writes to the named pipe actually do this through a .vbs script:

JSONData = "{some-json-string-data}"
Dim pipeClient
Set pipeClient = New NamedPipeClient.Init("some-pipe-name")

pipeClient.SendData JSONData

This code works flawlessly whenever I try to execute it, and every time the data is sent from vbs to the running background service, it's received as expected.

I tried to replace this logic to make use of MessagePipe.Interprocess NamedPipe:

Program.cs:

// somewhere inside the IHost build process the following is declared.
serviceCollection.AddMessagePipe()
                  .AddNamedPipeInterprocess(somePipeName, PipeServerConfigureAction);
// Note: PipeServerConfigureAction already declares `HostAsServer` as `true` like specified on docs.

BackgroundService.cs:


    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        // Were we already cancelled before executing service logic
        cancellationToken.ThrowIfCancellationRequested();

        while (!cancellationToken.IsCancellationRequested)
        { 
            // Considering this _subscriber is a IDistributedSubscriber<string,string> resolved from service constructor.
            await this._subscriber.SubscribeAsync("", s =>
            {
                this._logger.LogInformation($"Client connected [{DateTime.Now.ToShortTimeString()}]");
                    
                if (!(s.Length > 0))
                {
                    this._logger.LogWarning(
                        $"Client connection opened but none data was sent through the pipe.");

                    this._logger.LogInformation(
                        $"Disposing pipe.");
                        
                    return;
                }

                this._logger.LogInformation(
                    $"Success on trying to read data from pipe. Data read [{s}]");

                // do something with data read from pipe

                this._logger.LogInformation(
                    $"Disposing pipe.");
                    
            }, cancellationToken);

            await Task.Delay(10, cancellationToken);
        }
    }

But this doesn't work as expected. On first try nothing happens nor the action inside SubscribeAsync is executed and, then each following retry results in a exception thrown by the vbs itself saying that all pipe instances available are busy.

Even if I change the key parameter in SubscribeAsync to be exactly same as data sent from vbs:

BackgroundService.cs:

await this._subscriber.SubscribeAsync("someDataString", s => //...

VBScript file:

Data = "someDataString"
Dim pipeClient
Set pipeClient = New NamedPipeClient.Init("some-pipe-name")

pipeClient.SendData Data 

The code does not reach the handler action as expected.

I guess maybe some mistake was commited because it's my firsy try on this package or, I may have also misunderstood how IDistributedSubscriber<,> and the other interfaces to use with named pipes works in comparison with how a NamedPipeServerStream does the job.

If anyone can help me to solve this problem I would be grateful.

chrisbewz avatar May 21 '24 19:05 chrisbewz