Support S3 data loading
This PR introduces ~~the S3IndexedDataset, which supports loading a dataset stored in S3 in the same format as the MMapIndexedDataset~~ S3 data loading to IndexedDataset. In particular, the .idx file is downloaded to a local directory at initialization so that we can memory map it and the .bin file is streamed into memory block-by-block.
Draft implementation for #698
Left some comments. Thanks a bunch for the MR! Would like to see some consolidation in S3IndexedDataset before being merged.
@jkamalu Thank you for the comments! Based on those comments and after seeing this refactor, I refactored S3 data loading to integrate it into IndexedDataset and address your feedback.
@jkamalu The _S3BinReader will have poor performance when using a global random shuffle over samples (which is what GPTDataset currently does). I need to either implement "block shuffling" in GPTDataset as described in the "Example" section here (that section also describes why _S3BinReader will have poor performance) or I need to add an option to disable shuffling in GPTDataset (the user then has to be responsible for preshuffling their data). I'm inclined to just add the option to disable shuffling to start, because it's simpler. What do you think?
@jkamalu The _S3BinReader will have poor performance when using a global random shuffle over samples (which is what GPTDataset currently does). I need to either implement "block shuffling" in GPTDataset as described in the "Example" section here (that section also describes why _S3BinReader will have poor performance) or I need to add an option to disable shuffling in GPTDataset (the user then has to be responsible for preshuffling their data). I'm inclined to just add the option to disable shuffling to start, because it's simpler. What do you think?
Moving the "Example" section from the old NeMo PR into this comment.
In NeMo, a sample consists of seq_length tokens. For simplicity, suppose each token is 1 byte and seq_length is 100.
Each sample then takes 100 bytes.
Suppose we have a dataset with 12 samples.
Sample index 0 is stored in bytes [0, 100), sample index 1 is stored in bytes [100, 200), ..., and sample index 11 is stored in bytes [1100, 1200).
Currently, NeMo takes the list of sample indices:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
And produces a shuffle_idx, which is just a permutation of those sample indices like:
[11, 3, 0, 6, 1, 9, 10, 2, 5, 7, 4, 8]
The shuffle_idx determines the order in which NeMo processes samples.
We could have the IndexedDataset just grab the bytes for a sample at a time. The first request would be for the bytes [1100, 1200), the second request would be for the bytes [300, 400), the third request would be for the bytes [0, 100) and so on in the order determined by shuffle_idx. That works, but it's slow, because you're making one request for each sample.
Let's try to introduce an in-memory cache. In particular, suppose the IndexedDataset does this:
- If the requested bytes range [
start,end) is in the cache, then extract the requested bytes range from the cache. - Otherwise, first refresh the cache by downloading the bytes range [
start,start+cache_nbytes) and then extract the requested bytes range from the cache.
Suppose the cache_nbytes is 400. The first request would be for the bytes [1100, 1200). The cache is initially empty, so we refresh the cache by downloading the bytes [1100, 1500) and then extract the requested bytes range from the cache. The second request would be for the bytes [300, 400). Those bytes are not in the cache, so we refresh the cache by downloading the bytes [300, 700) and then extract the requested bytes range from that cache. And so on.
We actually made the problem worse. For most samples, we have to refresh the cache, so we have not reduced the number of requests much. We've just made the requests have to download a larger number of bytes. The issue is that the bytes needed for a sample index are probably not next to the bytes needed for the previous sample index.
To use the cache effectively, we have to introduce some correlation in the shuffle. In particular, we divide the original list of sample indices into blocks like:
- [0, 1, 2, 3]
- [4, 5, 6, 7]
- [8, 9, 10, 11]
We then shuffle within the blocks like:
- [3, 0, 2, 1]
- [4, 6, 5, 7]
- [11, 10, 8, 9]
We then shuffle the order of the blocks like:
- [11, 10, 8, 9]
- [4, 6, 5, 7]
- [3, 0, 2, 1]
And we construct the block-shuffled shuffle_idx like:
[11, 10, 8, 9, 4, 6, 5, 7, 3, 0, 2, 1]
We also have to change which bytes we download on a cache miss. In particular, we download the bytes [cache_start, cache_start + cache_nbytes), where cache_start is (start//cache_nbytes) * cache_nbytes.
The first request would be for the bytes [1100, 1200). The cache is initially empty, so we refresh the cache by downloading the bytes [800, 1200) and then extract the requested bytes range from that cache. The second request would be for the bytes [1000, 1100). We extract those bytes from the cache. The third request would be for the bytes [800, 1200). We extract those bytes from the cache. And so on. In this way, we only have to refresh cache at the start of each new block.
Marking as stale. No activity in 60 days.
Merged in https://github.com/NVIDIA/Megatron-LM/commit/a30a28dbe9063e8456ddc2f5ee1d26ede8589f63
Can mark as closed, thanks