flow icon indicating copy to clipboard operation
flow copied to clipboard

Directly connect to AWS S3 parquet

Open carlosMorell opened this issue 1 year ago • 13 comments

I've been looking in the documentation for several days, but I can't find the way, nor examples. To extract data from a parquet hosted in the aws S3... I don't know if I'm a potential nerd. But I'm getting stuck on this topic.

carlosMorell avatar Feb 12 '24 03:02 carlosMorell

Hey! Sorry, we are still working on better documentation 🚧 (help is more than welcome).

Please look at the example below, you should be able to replace from_csv with from_parquet and it should work:


use function Flow\ETL\Adapter\JSON\from_json;
use function Flow\ETL\Adapter\JSON\to_json;
use function Flow\ETL\DSL\concat;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use Flow\ETL\Adapter\Filesystem\AwsS3Stream;
use Flow\ETL\Adapter\Filesystem\AzureBlobStream;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Flow;
use Symfony\Component\Dotenv\Dotenv;

$s3_client_option = [
    'client' => [
        'credentials' => [
            'key' => $_ENV['S3_KEY'],
            'secret' => $_ENV['S3_SECRET'],
        ],
        'region' => 'eu-west-2',
        'version' => 'latest',
    ],
    'bucket' => 'flow-php',
];

$blob_account = $_ENV['AZURE_BLOB_ACCOUNT'];
$blob_key = $_ENV['AZURE_BLOB_KEY'];

$azure_blob_connection_string = [
    'connection-string' => "DefaultEndpointsProtocol=https;AccountName={$blob_account};AccountKey={$blob_key}",
    'container' => 'flow-php',
];

AwsS3Stream::register();
AzureBlobStream::register();

(new Flow())
    ->read(from_json(new Path('flow-aws-s3://dataset.json', $s3_client_option)))
    ->withEntry('id', ref('id')->cast('integer'))
    ->withEntry('name', concat(ref('name'), lit(' '), ref('last name')))
    ->drop('last name')
    ->write(to_json(new Path('flow-azure-blob://dataset_test.json', $azure_blob_connection_string)))
    ->run();

please let me know if that worked 😊

norberttech avatar Feb 12 '24 10:02 norberttech

My apologies for taking so long to respond. But I wanted to exhaust the possibilities before taking up your time...

Actually, I am trying to read portions of a parquet, and if possible to obtain an object. That I can traverse and manipulate for its insertion in a Mysql DB...

I have made many tests, but it gives me failures in the Schema.... And I don't know what I'm doing wrong.

It will be a pleasure to collaborate and document when I get it ;)

I leave you the code and the errors, if you can help me, without compromise to put a little light on this issue.

Translated with DeepL.com (free version)

`require '../vendor/autoload.php'; use function Flow\ETL\Adapter\Parquet{from_parquet, to_parquet}; //use function Flow\ETL\Adapter\JSON\from_json; //use function Flow\ETL\Adapter\JSON\to_json; use function Flow\ETL\DSL\concat; use function Flow\ETL\DSL\lit; use function Flow\ETL\DSL\ref; use Flow\ETL\Adapter\Filesystem\AwsS3Stream; //use Flow\ETL\Adapter\Filesystem\AzureBlobStream; use Flow\ETL\Filesystem\Path; use Flow\ETL\Flow; use Symfony\Component\Dotenv\Dotenv;

use function Flow\ETL\DSL\to_output; use Flow\ETL\Loader\StreamLoader\Output;

$s3_client_option = [ 'client' => [ 'credentials' => [ 'key' => 'xxxxxx', 'secret' => 'xxxxx', ], 'region' => 'us-east-1', 'version' => 'latest', ], 'bucket' => 'dbs3export', ];

$parquet = 'prod-02/xx/xx.xx/1/part-00000-71b81402-1b66-4029-99e6-c056d6fa35ff-c000.gz.parquet'; AwsS3Stream::register(); print '

';
(new Flow())
->read(from_parquet(new Path('flow-aws-s3://'.$parquet, $s3_client_option)))
->withEntry('id', ref('id')->cast('integer'))
->write(to_output(false))

// ->withEntry('name', concat(ref('name'), lit(' '), ref('last name'))) // ->drop('last name') // ->write(to_json(new Path('flow-azure-blob://dataset_test.json', $azure_blob_connection_string))) ->run();`

Heare the Errors:::

Warning: fseek(): Stream does not support seeking in C:\xampp\htdocs\panel__core\vendor\flow-php\parquet\src\Flow\Parquet\ParquetFile.php on line 53

Warning: fseek(): Stream does not support seeking in C:\xampp\htdocs\panel__core\vendor\flow-php\parquet\src\Flow\Parquet\ParquetFile.php on line 59

Warning: fseek(): Stream does not support seeking in C:\xampp\htdocs\panel__core\vendor\flow-php\parquet\src\Flow\Parquet\ParquetFile.php on line 65

Fatal error: Uncaught TypeError: Flow\Parquet\ParquetFile\Schema::fromThrift(): Argument #1 ($schemaElements) must be of type array, null given, called in C:\xampp\htdocs\panel__core\vendor\flow-php\parquet\src\Flow\Parquet\ParquetFile\Metadata.php on line 22 and defined in C:\xampp\htdocs\panel__core\vendor\flow-php\parquet\src\Flow\Parquet\ParquetFile\Schema.php:26 Stack trace: #0 C:\xampp\htdocs\panel__core\vendor\flow-php\parquet\src\Flow\Parquet\ParquetFile\Metadata.php(22): Flow\Parquet\ParquetFile\Schema::fromThrift(NULL) #1 C:\xampp\htdocs\panel__core\vendor\flow-php\parquet\src\Flow\Parquet\ParquetFile.php(70): Flow\Parquet\ParquetFile\Metadata::fromThrift(Object(Flow\Parquet\Thrift\FileMetaData)) #2 C:\xampp\htdocs\panel__core\vendor\flow-php\parquet\src\Flow\Parquet\ParquetFile.php(113): Flow\Parquet\ParquetFile->metadata() #3 C:\xampp\htdocs\panel__core\vendor\flow-php\parquet\src\Flow\Parquet\ParquetFile.php(124): Flow\Parquet\ParquetFile->schema() #4 C:\xampp\htdocs\panel__core\vendor\flow-php\etl-adapter-parquet\src\Flow\ETL\Adapter\Parquet\ParquetExtractor.php(42): Flow\Parquet\ParquetFile->values(Array, NULL) #5 [internal function]: Flow\ETL\Adapter\Parquet\ParquetExtractor->extract(Object(Flow\ETL\FlowContext)) #6 C:\xampp\htdocs\panel__core\vendor\flow-php\etl\src\Flow\ETL\Pipeline\SynchronousPipeline.php(65): Generator->valid() #7 C:\xampp\htdocs\panel__core\vendor\flow-php\etl\src\Flow\ETL\DataFrame.php(662): Flow\ETL\Pipeline\SynchronousPipeline->process(Object(Flow\ETL\FlowContext)) #8 C:\xampp\htdocs\panel__core\aws\s3flow.php(46): Flow\ETL\DataFrame->run() #9 {main} thrown in C:\xampp\htdocs\panelFormula__core\vendor\flow-php\parquet\src\Flow\Parquet\ParquetFile\Schema.php on line 26

carlosMorell avatar Feb 14 '24 17:02 carlosMorell

Hey! Thanks for detailed explanation, I think I know what's going on but let me test everything properly before I came back to you with an answer. I'm gonna keep this issue open until you confirm that it's working, sorry for closing it too early.

norberttech avatar Feb 14 '24 18:02 norberttech

Uhm I don't have good news, so there were 2 problems, one I managed to solve which unblocked writing to remote streams, however reading is a bit more problematic...

It's all related to how Parquet is implemented. In https://github.com/flow-php/parquet implementation, Reader and Writer are both relaying on \fseek to jump back and forward inside the file while reading different pieces of it. The problem is that remote filesystems are in general just http streams, which do not support seeking...

In writing, this is bypassed by first writing the entire file in the filesystem cache and uploading it to remote location only when stream is closed. This way, seek is operating on temporary stream which supports seeking, reading is a different story.

I'm gonna need to invest more time into this to either figure out the way how to avoid seeking (some internal caching maybe) or how to bypass this php limitation and emulate seeking.

It's very high on my priorities list now, I will keep working on it until I find a good solution.

Again, thank you for reporting this issue, please monitor this issue, I will post updates here 🫡

norberttech avatar Feb 14 '24 19:02 norberttech

No, thanks to you, for your effort and work. Stay tuned for updates on the issue.

carlosMorell avatar Feb 14 '24 20:02 carlosMorell

So this is solving "Writing to remote storage" https://github.com/flow-php/flow/pull/989 Reading will take me a bit longer, at this point I don't even have a good idea how to solve it. It seems I might not be able to avoid reading the entire file and caching it locally but I'm investigating if at least I could use reactphp/amphp to read remote files in batches and then combine them locally which might speed up the entire process.

norberttech avatar Feb 14 '24 20:02 norberttech

Finally, after a lot of thinking. I have opted to download the parquet to a local directory, and use the code I add to read it. But I still have doubts...

How do I know how many rows the complete parquet has? In the Schema it is protected and can not be accessed dynamically.

And how do I set a starting point to traverse the parquet? In the documentation I see the limit... but not a way to make an init, and to go from x in x until the end.... In fact, with this code , give me an error, because the offset... php said this var dont exist.... but if i delete it, it works.

`use Flow\Parquet\Reader;

$reader = new Reader();

$file = $reader->read('path/to/file.parquet'); foreach ($file->values(["column_1", "column_2"], limit: 100, offset: 1000) as $row) { // do something with $row }`

carlosMorell avatar Feb 17 '24 08:02 carlosMorell

Hey @carlosMorell offset is definitely there, just please make sure that you are using the flow-php/parquet: ^0.6.0 version of this library.

How do I know how many rows the complete parquet has?

You can read that directly from the parquet file metadata:

(new Reader())->read(__DIR__ . '/file.parquet')->metadata()->rowsNumber()

norberttech avatar Feb 18 '24 20:02 norberttech

hey, so quick update about this issue, I have been thinking a lot about how to solve it and so far the best solution that comes to my mind is to use an external filesystem like: https://github.com/seaweedfs/seaweedfs There is no way (at least not that I know) that would let me seek through remote files with PHP. I'm gonna keep playing with this idea because some bridge between seaweedfs and flow would probably be required.

norberttech avatar Feb 26 '24 19:02 norberttech

I thank you for your efforts and dedication,

I really almost got all my requirements solved. I download the parquet from the AWS S3. And I process it without problem, with the code I provided earlier in the thread.

The only drawback I find for now, is that with some parquet files, there are problems with the encoding, and returns the content in NULL... Except for the ID... which always returns the ID correctly, but I know that the info is inside the array.

I'm investigating your documentation and options to see if I can skip this last stone....

carlosMorell avatar Feb 27 '24 03:02 carlosMorell

if you could provide some more details, or even send me that parquet file over email I could look into this

norberttech avatar Feb 27 '24 06:02 norberttech

hey @carlosMorell it took me a while but I have some great news!

I was bothered by this issue so much that I ended up rewriting from scratch a filesystem library that supports reading/writing from remote/local locations by byte ranges, thanks to this I was able to simplify drastically the whole process of accessing remote files.

Here is a quick snippet of already working proof of concept that I'm currently working on:

$sdk = new BlobService(
    $config = new Configuration($account, $container),
    new Client(),
    new HttpFactory($psr17Factory, $psr17Factory),
    new AzureURLFactory(),
    new SharedKeyFactory($config, $accountKey)
);

$config = config_builder()
    ->mount(new AzureBlobFilesystem($sdk))
    ->build();

data_frame($config)
    ->read(from_csv(__DIR__ . '/orders.csv'))
    ->withEntry('discount', when(ref('discount')->isNull(), lit(0.0), ref('discount')))
    ->mode(overwrite())
    ->write(to_parquet(path('azure-blob://orders.parquet')))
    ->run();

data_frame($config)
    ->read(from_parquet('azure-blob://orders.parquet'))
    ->batchSize(10)
    ->write(to_output())
    ->run();

That particular example is based on Azure Blob Storage, but it would work exactly the same way for S3. As you can see, we won't need to register stream wrappers anymore 😁

Stay tuned, I'm not that far from releasing another version of Flow that is going to operate on that new filesystems abstraction

norberttech avatar Jun 28 '24 21:06 norberttech

Hey @norberttech !! Great news! Thanks for your work and efforts! I take a look this week!

carlosMorell avatar Jul 01 '24 09:07 carlosMorell