Reduce thread usage in pipelining
The following test runs very quickly with the min thread count elevated (e. g. 32) and very slowly without (e. g. 16). I believe that this is because the underlying streams open in a non-async way which means that our piping consumes thread, possibly more threads than it needs to due to how async is implemented:
[Test]
public void TestPipeline()
{
ThreadPool.SetMinThreads(100, 100);
const int ProcessCount = 10;
var pipeline = Enumerable.Range(0, ProcessCount)
.Select(_ => TestShell.Run(SampleCommand, "pipebytes"))
.Aggregate((first, second) => first | second);
try
{
for (var i = 0; i < 10; ++i)
{
char @char = (char)('a' + i);
pipeline.StandardInput.AutoFlush.ShouldEqual(true);
var writeTask = pipeline.StandardInput.WriteAsync(@char);
writeTask.Wait(TimeSpan.FromSeconds(30)).ShouldEqual(true, $"write {i} should complete");
var buffer = new char[10];
var readTask = pipeline.StandardOutput.ReadAsync(buffer, 0, buffer.Length);
readTask.Wait(TimeSpan.FromSeconds(30)).ShouldEqual(true, $"read {i} should complete");
readTask.Result.ShouldEqual(1);
buffer[0].ShouldEqual(@char);
}
pipeline.StandardInput.Dispose();
pipeline.Task.Wait(TimeSpan.FromSeconds(30)).ShouldEqual(true, "pipeline should exit");
}
finally
{
pipeline.Kill();
}
}
// in program.cs
case "pipebytes":
using (var standardInput = Console.OpenStandardInput())
using (var standardOutput = Console.OpenStandardOutput())
{
var buffer = new byte[10];
while (true)
{
var bytesRead = standardInput.Read(buffer, 0, buffer.Length);
if (bytesRead == 0) { break; }
standardOutput.Write(buffer, 0, bytesRead);
standardOutput.Flush();
}
}
break;
Ways to fix could be trying to get the pipes to switch over to async, or even just using non-async reads/flushes to avoid extra thread use.
Possibly relevant: https://devblogs.microsoft.com/oldnewthing/20130812-00/?p=3533
Sample code fails with "pipe busy" because a pipe can't have more handles open:
void Main()
{
var path = @"C:\Users\mikea_000\Documents\Interests\CS\MedallionShell\SampleCommand\bin\Debug\net46\SampleCommand.exe";
var proc = new Process { StartInfo = { FileName = path, Arguments = "pipe", UseShellExecute = false, RedirectStandardInput = true, RedirectStandardOutput = true, RedirectStandardError = true } };
try
{
proc.Start();
var stdinHandle = ((FileStream)proc.StandardInput.BaseStream).SafeFileHandle;
var newHandle = ReOpenFile(stdinHandle, 0x40000000, 1 | 2| 4, 0x40000000 | 0x08000000 | 0x04000000);
Marshal.ThrowExceptionForHR(Marshal.GetHRForLastWin32Error());
newHandle.Dump();
}
finally {
proc.Kill();
}
}
[DllImport("kernel32.dll", SetLastError = true)]
private static extern SafeFileHandle ReOpenFile(SafeFileHandle hOriginalFile, uint dwDesiredAccess, uint dwShareMode, uint dwFlagsAndAttributes);
Some initial (looking unsuccessful) work to use async IO on linux: https://github.com/madelson/MedallionShell/tree/asyncify-on-linux
Tried playing around with ReOpenFile a bit more for this on Windows, to no avail:
https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-reopenfile
void Main()
{
var sc = @"C:\Users\...\MedallionShell\SampleCommand\bin\Debug\net46\SampleCommand.exe";
var process = new Process { StartInfo = { FileName = sc, RedirectStandardOutput = true, RedirectStandardInput = true, UseShellExecute = false, CreateNoWindow = true } };
process.Start();
try
{
var standardOutput = (FileStream)process.StandardOutput.BaseStream;
var newHandle = ReOpenFile(standardOutput.SafeFileHandle.DangerousGetHandle(), FileAccess.Read, FileShare.Write, ((FileAttributes)0x40000000).Dump());
newHandle.Dump();
Marshal.GetLastWin32Error().Dump();
var newSafeHandle = new SafeFileHandle(newHandle, ownsHandle: true);
var stream = new FileStream(newSafeHandle, FileAccess.Read, 1024, isAsync: true);
}
finally {
process.Kill();
}
}
[DllImport("kernel32.dll", SetLastError = true)]
public static extern IntPtr ReOpenFile(IntPtr originalFile, FileAccess dwDesiredAccess, FileShare dwShareMode, FileAttributes dwFlagsAndAttributes);
One option here on linux would be to use poll (https://devarea.com/linux-io-multiplexing-select-vs-poll-vs-epoll/#.XfoZdOhKiUk). We could then have one thread polling all 3 stdio streams for a process, or even across multiple processes (getting this exactly right would be tricky because of the need to sync up poll times, but could probably be done with a clever handling of timeouts).
On Windows there is PeekNamedPipe (https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe) which lets us scan a pipe without blocking. The problem is that this doesn't block at all, so we just end up busy-waiting unless we add artificial sleeps.