SQL Pipe
🚀 The feature
Allows for sourcing datasets from SQL queries. Should allow to substitute in different backends. eg Athena, Presto, Postgres. Should do smart batching to minimize number of query requests and maximize data returned per query.
Batching will probably be backend dependent. Eg Athena/Presto do not support LIMIT .. SKIP ... queries
Motivation, pitch
Data is often stored in large datalakes on which SQL queries can be run to transform and load data. At some point this data must be turned into tensors however loading of this takes quite a lot of care and each query can take multiples of seconds meaning it cannot be done
Alternatives
Could not find any existing implementations. I have written my own solutions that allows user to index on a query but it is very hacky.
Current solution looks something like this
from itertools import groupby
class SQLTAble():
@cached
def get_patient(self, id) -> pd.DataFrame:
return self.get_patients([id])
def get_patients(self, ids) -> pd.DataFrame:
ids = '(' + ','.join(ids) + ')'
query = f"""
SELECT patientid, offset, parameterid, value from icu_data
where patientid in {ids}
order by patientid, offset, parameterid
"""
return client.query(query)
def prefill_cache(self, all_patients):
for patients in batch(all_patients, batch_size):
results = self.get_patients(patients)
patient_iter, patient_results_iter = groupby(results, lambda x: x['patientid'])
for patient, patient_results in zip(patient_iter, patient_results_iter):
self.add_cache(patient, patient_results)
the @cached decorator check if the patient is already in an lmdb database and shortcuts having to perform the query if it is.
Naievely fetching patient by patient is far too slow but it's also not possible to fetch all patients at once as this would result in memory issues.
Not specifically related to the SQLPipe ... but for this specific use case that I am working on at the moment there would also be a need to transform the data from columnar format to timeseries format. This is also quite expensive at scale and might benefit from a Pipe function.
eg above example has (patientid, offset, parameterid, value), this should be turned into a tensor of shape (patient x seq_len x n_parameters) and the values filled from value. Basically the data is queried in sparse COO format and needs to be densified. (There may be an SQL way of doing this, if so would love to hear it. My SQL is admittedly weak)
Additional context
No response
This might be one OSS story integrating both TorchArrow and TorchData, where TorchArrow handles DataFrame transformation and forward to backend SQL engine cc: @wenleix @VitalyFedyunin
Note: If we allow such DataPipe we would need to take special care of order nondeterministic nature of most SQL database engines.
Yeah sounds makes sense to use Arrow memory layout to fetch data from warehouse, for example Snowflakes Python client support fetching result batch as Arrow table: https://docs.snowflake.com/en/user-guide/python-connector-distributed-fetch.html#retrieving-the-list-of-result-batches .
The data buffer in Arrow Array already implements Python buffer protocol so can be converted to Tensor via torch.frombuffer, e.g. https://github.com/wenleix/axolotls/blob/ccce45361d4e78c530d23d8e70e7a2686315cd81/axolotls/string_column.py#L98-L102
cc @dracifer , @bearzx
Agree it should be achievable. But in that case, users will use 2 sets of languages (SQL and Dataframe-style) to describe transformation logic? Is it a reasonable user experience?