ZstdSharp icon indicating copy to clipboard operation
ZstdSharp copied to clipboard

Allow passing in compressed size into DecompressionStream

Open xPaw opened this issue 1 year ago • 2 comments

I have some stream where it contains multiple zstd streams with other data in between, so I need to read the exact compressed buffer stream that I know the size of, and I seemingly can't use DecompressionStream because it will read data from stream up to its buffer size which may read past of my compressed size.

I can't really set stream buffer size to anything other than full compressed size, which would defeat the purpose of using the stream here.

My current non-stream code is this:

private static void DecompressZSTD(ZstdSharp.Decompressor zstdDecompressor, BinaryReader reader, Span<byte> output, int compressedSize)
{
    var inputBuf = ArrayPool<byte>.Shared.Rent(compressedSize);

    try
    {
        var input = inputBuf.AsSpan(0, compressedSize);
        reader.Read(input);

        if (!zstdDecompressor.TryUnwrap(input, output, out var written) || output.Length != written)
        {
            throw new InvalidDataException($"Failed to decompress ZSTD (expected {output.Length} bytes, got {written})");
        }
    }
    finally
    {
        ArrayPool<byte>.Shared.Return(inputBuf);
    }
}

The issue is basically here, as it will read past my total compressed size to fill in the input buffer. https://github.com/oleg-st/ZstdSharp/blob/511daf1c42ee5ac820d12c0234d4bb395ed86f72/src/ZstdSharp/DecompressionStream.cs#L129-L131

xPaw avatar Apr 27 '25 09:04 xPaw

You can put this logic into the inner stream. Read can return less data than requested (buffer size). DecompressionStream will decompress all the data it has before reading the next part.

oleg-st avatar Apr 27 '25 20:04 oleg-st

@xPaw

    private static void DecompressZSTD(ZstdSharp.Decompressor zstdDecompressor, BinaryReader reader, Span<byte> output, int compressedSize)
    {
        using var decompressionStream = new ZstdSharp.DecompressionStream(new LimitedReadStream(reader, compressedSize), zstdDecompressor);
        decompressionStream.ReadExactly(output);
    }

LimitedReadStream is something like this (written by ChatGPT)

using System;
using System.IO;

public class LimitedReadStream : Stream
{
    private readonly BinaryReader _baseReader;
    private readonly long _bytesRemaining;
    private long _bytesRead;

    public LimitedReadStream(BinaryReader baseReader, long length)
    {
        if (baseReader == null)
            throw new ArgumentNullException(nameof(baseReader));
        if (length < 0)
            throw new ArgumentOutOfRangeException(nameof(length), "Length must be non-negative.");

        _baseReader = baseReader;
        _bytesRemaining = length;
        _bytesRead = 0;
    }

    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;

    public override long Length => _bytesRemaining;

    public override long Position
    {
        get => _bytesRead;
        set => throw new NotSupportedException("Setting the position is not supported.");
    }

    public override void Flush()
    {
        // No-op since stream is read-only
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (buffer == null)
            throw new ArgumentNullException(nameof(buffer));
        if (offset < 0 || count < 0 || (offset + count) > buffer.Length)
            throw new ArgumentOutOfRangeException();

        long bytesLeft = _bytesRemaining - _bytesRead;
        if (bytesLeft <= 0)
            return 0;

        int toRead = (int)Math.Min(count, bytesLeft);
        int read = _baseReader.Read(buffer, offset, toRead);
        _bytesRead += read;
        return read;
    }

    public override long Seek(long offset, SeekOrigin origin) =>
        throw new NotSupportedException("Seeking is not supported.");

    public override void SetLength(long value) =>
        throw new NotSupportedException("Setting length is not supported.");

    public override void Write(byte[] buffer, int offset, int count) =>
        throw new NotSupportedException("Writing is not supported.");
}

oleg-st avatar May 10 '25 10:05 oleg-st