filesystem_spec icon indicating copy to clipboard operation
filesystem_spec copied to clipboard

Seeking on Async FS is bugged / not-working

Open bluecoconut opened this issue 1 year ago • 15 comments

Here's a minimal example:

import fsspec
import asyncio

async def async_version():
    print("Async Version")
    fs = fsspec.filesystem("http", asynchronous=True)
    session = await fs.set_session()
    file = await fs.open_async("https://example.com/")
    print("Starting Tell", file.tell(), "seeking to 20")
    file.seek(20)
    print(f"Read 5 bytes, from tell of {file.tell()}:", await file.read(5), "now tell:", file.tell())
    file.seek(20)
    print(f"Read 5 bytes, from tell of {file.tell()}:", await file.read(5), "now tell:", file.tell())
    await file.close()
    await session.close()

def sync_version():
    print("Sync Version")
    fs = fsspec.filesystem("http")
    file = fs.open("https://example.com/")
    print("Starting Tell", file.tell(), "seeking to 20")
    file.seek(20)
    print(f"Read 5 bytes, from tell of {file.tell()}:", file.read(5), "now tell:", file.tell())
    file.seek(20)
    print(f"Read 5 bytes, from tell of {file.tell()}:", file.read(5), "now tell:", file.tell())
    file.close()

if __name__ == '__main__':
    asyncio.run(async_version())
    sync_version()

This outputs

Async Version
Starting Tell 0 seeking to 20
Read 5 bytes, from tell of 20: b'<!doc' now tell: 25
Read 5 bytes, from tell of 20: b'type ' now tell: 25
Sync Version
Starting Tell 0 seeking to 20
Read 5 bytes, from tell of 20: b'l>\n<h' now tell: 25
Read 5 bytes, from tell of 20: b'l>\n<h' now tell: 25

Note the async version, while respecting seek and tell, and even updating the .loc after a read, so updated .tell works in terms of describing the .loc, but the actual bytes that .read is operating on are wrong.

The document starts <!doctype so we can see that the two .read() are just reading sequentially, and the seek operation in the async implementation is not affecting the returned bytes (despite updating the .loc)

I actually originally found this via a s3 filesystem with cache_type='background', but as I removed things I eventually got all the way down to pure http and found it still is not working.

bluecoconut avatar Jan 14 '25 21:01 bluecoconut

We should fix this - async/streaming files should not be seekable at all

martindurant avatar Jan 14 '25 21:01 martindurant

Hmm... is "asynchronous" flag on the filesystem meant to mean 1:1 with streaming? That feels like an over-specification / a mismatch from what I would expect when trying to use an async interface.

My goal when using async is to prevent IO bound operations from blocking. (I'm trying to read multiple files (from multiple urls / s3 keys) at once). Is there another way I can achieve this without going down the "streaming" path?

This is sort of what I originally thought I could do (just open with open_async)

import fsspec
import asyncio
import random

async def read_bytes(url, start, end):
    my_id = random.randint(0, 100000)
    print(f"[{my_id:05}] Starting")
    print(f"[{my_id:05}] Starting read from {start} to {end}")
    try:
        fs = fsspec.filesystem("http")
        f = await fs.open_async(url, 'rb')
        f.seek(start)
        return await f.read(end - start)
    finally:
        await f.close()
        print(f"[{my_id:05}] Done")


async def batch_read():
    url = "https://ash-speed.hetzner.com/1GB.bin"
    offsets = [random.randint(0, 1024 * 1024 * 1024) for _ in range(5)]
    tasks = [read_bytes(url, o, o + 50 * 1024 * 1024) for o in offsets]
    results = await asyncio.gather(*tasks)
    print([len(r) for r in results])

if __name__ == '__main__':
    asyncio.run(batch_read())

but this gets an error

File "... lib/python3.12/site-packages/aiohttp/helpers.py", line 636, in __enter__
RuntimeError: Timeout context manager should be used inside a task

Digging around, i found the asynchronous=True flag on the fs class, which removes the error about "Timeout context manager inside task", but then breaks the actual behavior (seek and reads are no longer behaving / have bugs with that exact code, but then asynchronous=True added to filesystem(...) constructor.

bluecoconut avatar Jan 14 '25 21:01 bluecoconut

Hmm... is "asynchronous" flag on the filesystem meant to mean 1:1 with streaming?

No, that specifies the intent on whether the async def _* methods will get called, or their sync counterparts.

However, the sync file-like from open() doesn't make sense in an async context, but its probably a bad idea to overload open() to produce the streaming async variant when asynchronous=True.

martindurant avatar Jan 14 '25 21:01 martindurant

My goal when using async is to prevent IO bound operations from blocking

The sync file-like object does, of course, call down into the async code, so it is possible to get a true async and random-access file, but this is not exposed. We don't know what the API should look like, since IOBase is certainly sync; furthermore, a file-like is stateful (the file position), so races are very possible.

martindurant avatar Jan 14 '25 22:01 martindurant

here's a verison where I don't set the asynchronous=True flag, but still get behavior that is unexpected.

import fsspec
import asyncio
import random

async def read_bytes(url, start, end):
    my_id = random.randint(0, 100000)
    print(f"[{my_id:05}] Starting")
    print(f"[{my_id:05}] Starting read from {start} to {end}")
    try:
        fs = fsspec.filesystem("http", loop=asyncio.get_running_loop())
        f = await fs.open_async(url, 'rb')
        f.seek(start)
        return await f.read(end - start)
    finally:
        await f.close()
        print(f"[{my_id:05}] Done")


async def batch_read():
    url = "https://ash-speed.hetzner.com/1GB.bin"
    offsets = [random.randint(0, 1024 * 1024 * 1024) for _ in range(5)]
    tasks = [read_bytes(url, o, o + 50 * 1024 * 1024) for o in offsets]
    results = await asyncio.gather(*tasks)
    print("Results:", [r[:15] for r in results])
    print([len(r) for r in results])

if __name__ == '__main__':
    asyncio.run(batch_read())

This runs, but outputs:

[17262] Starting
[17262] Starting read from 94493796 to 146922596
[00710] Starting
[00710] Starting read from 384436511 to 436865311
[45430] Starting
[45430] Starting read from 632900967 to 685329767
[96631] Starting
[96631] Starting read from 534823591 to 587252391
[02151] Starting
[02151] Starting read from 615312974 to 667741774
[45430] Done
[02151] Done
[00710] Done
[17262] Done
[96631] Done
Results: [b'\x08\x00)3\\\x0f\xb81\xd1\xfa\xcd\xea\xbe#\x97', b'\x08\x00)3\\\x0f\xb81\xd1\xfa\xcd\xea\xbe#\x97', b'\x08\x00)3\\\x0f\xb81\xd1\xfa\xcd\xea\xbe#\x97', b'\x08\x00)3\\\x0f\xb81\xd1\xfa\xcd\xea\xbe#\x97', b'\x08\x00)3\\\x0f\xb81\xd1\xfa\xcd\xea\xbe#\x97']
[16126, 16126, 16126, 16126, 16126]

by specifying the loop, i avoid the co-routine error (which i believe is due to fsspec creating its own event loop if not specified).

Note that all the reads are identical (ignoring seek) and also not the full-size they are supposed to be (16126 bytes).


In the code above, if I remove the async behavior from fsspec, things instead start to block (the underlying sync -> async doesn't prevent the sync calls from blocking eachother).

import fsspec
import asyncio
import random

async def read_bytes(url, start, end):
    my_id = random.randint(0, 100000)
    print(f"[{my_id:05}] Starting")
    print(f"[{my_id:05}] Starting read from {start} to {end}")
    try:
        fs = fsspec.filesystem("http")
        f = fs.open(url, 'rb')
        f.seek(start)
        return f.read(end - start)
    finally:
        print(f"[{my_id:05}] Done")


async def batch_read():
    url = "https://ash-speed.hetzner.com/1GB.bin"
    offsets = [random.randint(0, 1024 * 1024 * 1024) for _ in range(5)]
    tasks = [read_bytes(url, o, o + 50 * 1024 * 1024) for o in offsets]
    results = await asyncio.gather(*tasks)
    print("Results:", [r[:15] for r in results])
    print([len(r) for r in results])

if __name__ == '__main__':
    asyncio.run(batch_read())

the calls block eachother and there are no parallel requests happening.

Am I missing something obvious about the API / how to use fsspec?

bluecoconut avatar Jan 14 '25 22:01 bluecoconut

Based on your comment:

The sync file-like object does, of course, call down into the async code, so it is possible to get a true async and random-access file, but this is not exposed.

I decided to try to see if I could get the behavior I wanted by digging in a bit more.

I got something that now works, but took a lot of monkey-patching.

Specifically, i ended up monkeypatching:

  • fsspec.spec.AbstractBufferedFile.read_async = read_async
  • fsspec.caching.UpdatableLRU.acall = acall
  • fsspec.caching.BackgroundBlockCache._fetch_async = _fetch_async
  • fsspec.caching.BackgroundBlockCache._fetch_block_async = _fetch_block_async

I created a cleanup method: ( fsspec.caching.BackgroundBlockCache._cleanup_async_future = _cleanup_async_future

and then to side-step the .info call in open(...) I directly called fsspec.implementations.http.HTTPFile initializer, and wrote my own method to pre-populate the size.

Example code here
import fsspec
import asyncio
import random
import aiohttp

import logging

logger = logging.getLogger("fsspec")

async def read_async(self, length=-1):
    """
    Return data from cache, or fetch pieces as necessary

    Parameters
    ----------
    length: int (-1)
        Number of bytes to read; if <0, all remaining bytes.
    """
    length = -1 if length is None else int(length)
    if self.mode != "rb":
        raise ValueError("File not in read mode")
    if length < 0:
        length = self.size - self.loc
    if self.closed:
        raise ValueError("I/O operation on closed file.")
    if length == 0:
        # don't even bother calling fetch
        return b""
    out = await self.cache._fetch_async(self.loc, self.loc + length)
    logger.debug(
        "%s read: %i - %i %s",
        self,
        self.loc,
        self.loc + length,
        self.cache._log_stats(),
    )
    self.loc += len(out)
    return out

fsspec.spec.AbstractBufferedFile.read_async = read_async

async def _fetch_block_async(self, block_number: int, log_info: str = "sync") -> bytes:
    """
    Fetch the block of data for `block_number`.
    """
    if block_number > self.nblocks:
        raise ValueError(
            f"'block_number={block_number}' is greater than "
            f"the number of blocks ({self.nblocks})"
        )

    start = block_number * self.blocksize
    end = start + self.blocksize
    logger.info("BlockCache fetching block (%s) %d", log_info, block_number)
    self.total_requested_bytes += end - start
    self.miss_count += 1
    async_fetcher = self.fetcher.__self__.async_fetch_range
    block_contents = await async_fetcher(start, end)
    return block_contents

fsspec.caching.BackgroundBlockCache._fetch_block_async = _fetch_block_async

async def acall(self, asyncfunc, *args, **kwargs):
    if kwargs:
        raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}")
    with self._lock:
        if args in self._cache:
            self._cache.move_to_end(args)
            self._hits += 1
            return self._cache[args]

    result = await asyncfunc(*args, **kwargs)

    with self._lock:
        self._cache[args] = result
        self._misses += 1
        if len(self._cache) > self._max_size:
            self._cache.popitem(last=False)

    return result

fsspec.caching.UpdatableLRU.acall = acall

async def _fetch_async(self, start: int, end: int) -> bytes:
    if start is None:
        start = 0
    if end is None:
        end = self.size
    if start >= self.size or start >= end:
        return b""

    # byte position -> block numbers
    start_block_number = start // self.blocksize
    end_block_number = end // self.blocksize

    fetch_future_block_number = None
    fetch_future = None
    with self._fetch_future_lock:
        # Background thread is running. Check we we can or must join it.
        if self._fetch_future is not None:
            assert self._fetch_future_block_number is not None
            if self._fetch_future.done():
                logger.info("BlockCache joined background fetch without waiting.")
                self._fetch_block_cached.add_key(
                    await self._fetch_future, self._fetch_future_block_number
                )
                # Cleanup the fetch variables. Done with fetching the block.
                self._fetch_future_block_number = None
                self._fetch_future = None
            else:
                # Must join if we need the block for the current fetch
                must_join = bool(
                    start_block_number
                    <= self._fetch_future_block_number
                    <= end_block_number
                )
                if must_join:
                    # Copy to the local variables to release lock
                    # before waiting for result
                    fetch_future_block_number = self._fetch_future_block_number
                    fetch_future = self._fetch_future

                    # Cleanup the fetch variables. Have a local copy.
                    self._fetch_future_block_number = None
                    self._fetch_future = None

    # Need to wait for the future for the current read
    if fetch_future is not None:
        logger.info("BlockCache waiting for background fetch.")
        # Wait until result and put it in cache
        self._fetch_block_cached.add_key(
            await fetch_future, fetch_future_block_number
        )

    # these are cached, so safe to do multiple calls for the same start and end.
    for block_number in range(start_block_number, end_block_number + 1):
        # self._fetch_block_cached(block_number)
        await self._fetch_block_cached.acall(self._fetch_block_async, block_number)

    # fetch next block in the background if nothing is running in the background,
    # the block is within file and it is not already cached
    end_block_plus_1 = end_block_number + 1
    with self._fetch_future_lock:
        if (
            self._fetch_future is None
            and end_block_plus_1 <= self.nblocks
            and not self._fetch_block_cached.is_key_cached(end_block_plus_1)
        ):
            self._fetch_future_block_number = end_block_plus_1
            self._fetch_future = asyncio.ensure_future(
                self._fetch_block_async(end_block_plus_1, "async")
            )

    return self._read_cache(
        start,
        end,
        start_block_number=start_block_number,
        end_block_number=end_block_number,
    )

fsspec.caching.BackgroundBlockCache._fetch_async = _fetch_async

async def _cleanup_async_future(self):
    with self._fetch_future_lock:
        if self._fetch_future is not None:
            self._fetch_future.cancel()
            self._fetch_future = None

fsspec.caching.BackgroundBlockCache._cleanup_async_future = _cleanup_async_future


async def read_bytes(url, start, end):
    my_id = random.randint(0, 100000)
    print(f"[{my_id:05}] Starting")
    print(f"[{my_id:05}] Starting read from {start} to {end}")
    try:
        async with aiohttp.ClientSession() as session:
            fs = fsspec.filesystem("http")
            async with session.get(url, headers={"Range": "bytes=0-0"}) as response:
                size = int(response.headers.get("Content-Range", "bytes 0-0/0").split("/")[-1])
            f = fsspec.implementations.http.HTTPFile(
                fs,
                url,
                session=session,
                size=size,
                mode="rb",
                block_size=1024*1024*50,
                cache_type='background'
            )
            f.seek(start)
            data = await f.read_async(end - start)
            print(f"[{my_id:05}] Read {len(data)} bytes")
            await f.cache._cleanup_async_future()
            return data
    finally:
        print(f"[{my_id:05}] Done")


async def batch_read():
    url = "https://ash-speed.hetzner.com/1GB.bin"
    offsets = [random.randint(0, 1024 * 1024 * 1024) for _ in range(5)]
    tasks = [read_bytes(url, o, o + 50 * 1024 * 1024) for o in offsets]
    results = await asyncio.gather(*tasks)
    print("Results:", [r[:15] for r in results])
    print([len(r) for r in results])

if __name__ == '__main__':
    asyncio.run(batch_read())

In terms of APIs, I think the most confusing thing (that this thread highlighted) is that the open_async method returns a AsyncStreamFile. To me, conceputally the streaming backends are a separable behavior from async handling, and it'd be nice if fsspec had a set of async/non-blocking APIs. (for my use-case, at the file-level, all I need are async version of open, read and close.

bluecoconut avatar Jan 14 '25 23:01 bluecoconut

You are quite right, there are two distinct things going on:

  • streaming files (i.e., no random-access), possibly with unknown length
  • async methods on a file

It is possible that you would want both, which is what open_async currently does, but each is independently useful in some cases.

There are some implementations of async file-like objects (aiofiles and such) but nothing standard. You couldn't pass this thing to anything expecting a standard IOBase object. Is a file an iterator of chunks, or lines, or something else? Can multiple coroutines wait on the same file?

There is exactly one sync streaming file, in HTTPFileSystem for the case that the size cannot be determined and/or byte-range requests are not allowed.

martindurant avatar Jan 22 '25 17:01 martindurant

We should fix this - async/streaming files should not be seekable at all

Can you clarify what you mean by this, especially the async part?

I'm currently trying to implement range requests with async fs' and this is giving me a bit of a headache. For sync operations, this is rather trivial, as I can just seek the start of the range, but if that's (intentionally?) not supported for async fs', how would I go about implementing this?

Assuming a naive sync implementation

def stream_with_offset(
    file: str, 
    fs: AbstractFileSystem, 
    offset: int = 0, 
    chunksize: int = 1024
):
    with fs.open(file) as fh:
        fh.seek(offset)
        while chunk := fh.read(chunksize):
           yield chunk

I would expect this async implementation to work as well

async def stream_with_offset(
    file: str, 
    fs: AsyncFileSystem, 
    offset: int = 0, 
    chunksize: int = 1024
):
    async with fs.open_async(file) as fh:
        fh.seek(offset)
        while chunk := await fh.read(chunksize):
           yield chunk

but without being able to seek, it seems I'd have to resort to something like this?

async def stream_with_offset(
    file: str, 
    fs: AsyncFileSystem, 
    offset: int = 0, 
    chunksize: int = 1024
):
    async with fs.open_async(file) as fh:
        if offset > 0:
            discarded = 0
            while discarded < offset:
                chunk = await fh.read(min(chunksize, offset - discarded))
                if not chunk:
                    break
                discarded += len(chunk)
        
        while chunk := await fh.read(chunksize):
            yield chunk

provinzkraut avatar Mar 15 '25 10:03 provinzkraut

I think this is a question of definitions. "streaming", to me, means starting at byte 0 and reading forwards. In this case, it is convoluted with "async mode", deferring to a lower-level API capable of reading the bytes chunk-wise. The HTTP, it can in some cases be the only option, where the server doesn't respect Range requests.

It would be conceivable to have an async file-like interface (async read(), async write() ) which is random-access (with buffering), since the underlying calls are, after all, async, for many backends. This would offer concurrency when reading from multiple files, but not within any given file, since files have internal state, the current location. Also, it would of course no longer be a standard API, but maybe would still be useful.

martindurant avatar Mar 17 '25 15:03 martindurant

Also, it would of course no longer be a standard API, but maybe would still be useful.

Could you elaborate on this? Non-standard in regard to which standard? If it would e.g. mirror the stdlib's "file like" protocols, that would be fairly standard and a naive implementation like the one I shared above would work.

Fwiw, anyio has an AsyncFile interface, which does just that.

provinzkraut avatar Mar 17 '25 16:03 provinzkraut

If it would e.g. mirror the stdlib's "file like" protocols

Exactly what I mean.

I am aware of some libraries that have tried to do this kind of thing, actually quite a few - but they each do (similar but) different things!

martindurant avatar Mar 17 '25 16:03 martindurant

I think this is a question of definitions. "streaming", to me, means starting at byte 0 and reading forwards. In this case, it is convoluted with "async mode", deferring to a lower-level API capable of reading the bytes chunk-wise.

To me, this was the unexpected thing because I would have expected open_async to return, conceptually, the same kind of object open returns, and not a stream as per this definition.

Suppose one were to add an interface to retrieve an async file-like object to fsspec, seeing that open_async is already taken and I guess making it return a different type is going to be a headache in terms of backwards compatibility, do you have any suggestions where something like this should be added instead and if this is something you'd be interested in supporting?

provinzkraut avatar Mar 18 '25 08:03 provinzkraut

open_async isn't much used for the very reasons you specify. We could rename it "open_stream" and have open_async be the functionality you imagined it to be, or have flags on open_async to choose between the different implementations.

Alternatively, a filesystem should know when it is asynchronous, and the current open() doesn't really make any sense in an async context - it could be the thing that returns an async file-like object.

The only reservation I have, is that a new class allowing for async read() will end up looking a whole lot like the current AbstractBufferedFile blocking implementations, and I'm not sure how we can prevent duplicating a load of things. Can the two be done in the same class?

martindurant avatar Mar 20 '25 14:03 martindurant

Alternatively, a filesystem should know when it is asynchronous, and the current open() doesn't really make any sense in an async context - it could be the thing that returns an async file-like object.

How does that fit into the current sync/async design of fsspec? Iiuc, even async implementations provide sync wrappers, and if you want to use the async native methods, you should use the _-prefixed equivalents. In that case, shouldn't open return a sync thing and _open return the async equivalent?

The only reservation I have, is that a new class allowing for async read() will end up looking a whole lot like the current AbstractBufferedFile blocking implementations, and I'm not sure how we can prevent duplicating a load of things. Can the two be done in the same class?

Could be done in the same class for sure, but personally I always doing as much work as possible sans-io (e.g. in a base class) and then just accept I've got some code duplication (or use a tool like unasyncd to just generate the code for me 🙂).

provinzkraut avatar Mar 20 '25 16:03 provinzkraut

even async implementations provide sync wrappers

Yes, but (pretty much) the sync variants should only be called from sync code and async methods only from async code. The exception is, when the caller has arranged for event loops to be running on different threads, which is for certain advanced usage.

Note that _open is not async, but a normal method that it's not anticipated the user would call directly. Probably the naming convention could have been better. There are other non-async methods in this category like _strip_protocol.

martindurant avatar Mar 20 '25 16:03 martindurant