Writing results in a Dataframe works with apply, however the same doesn't work in parallel apply. Need help with the same.
Please Find Below, the snippet with which we tried populating output with apply and parallel _apply functions. The parallel_apply function does not seem to work. Would help if you could decipher this.
import pandas as pd
import numpy as np
global outputDF
global index
def myFunction(array):
global index
global outputDF
# Some random operation
outputDF['min'][index] = np.nanmin(array)
outputDF['max'][index] = np.nanmax(array)
outputDF['mean'][index] = np.nanmean(array)
index += 1
# Returning a useless variable
return 0
if __name__ == "__main__":
global outputDF
global index
# A random window size
windowSize = 10
# Preparing some random input data
inputDF = pd.DataFrame({ 'randomValue': [np.nan] * 500 })
for i in range(len(inputDF)):
inputDF['randomValue'].values[i] = np.random.rand()
# Pre-Allocate memory
outputDF = pd.DataFrame({ 'min': [np.nan] * len(inputDF),
'max': [np.nan] * len(inputDF),
'mean': [np.nan] * len(inputDF)
})
# Precise the staring index (due to the window size)
d = (windowSize - 1) / 2
index = np.int(np.floor( d ) )
# Do the rolling apply here
apply=inputDF['randomValue'].rolling(window=windowSize,center=True).apply(myFunction,args=())
parallel_apply=inputDF['randomValue'].rolling(window=windowSize,center=True).parallel_apply(myFunction,args=())
#assert index + np.int(np.ceil(d)) == len(inputDF), 'Length mismatch'
outputDF.set_index = inputDF.index
# Optional : Clean the nulls
outputDF.dropna(inplace=True)
print(outputDF)
I took some time to investigate and I think there is a problem with using pandarallel for RollingGroupBy's with center=True:
As far as I can tell, using .rolling(center=True).parallel_apply(func) on a pd.Series or a pd.DataFrame with a single column distributes the rolling windows across workers and (I guess) windows at the start of a new chunk are "dropped".
Notably this problem does not occur when using center=False. Unfortunately this part of the codebase is somewhat alien to me, so I don't know how to fix it.
More minimalistic example:
import pandas as pd
import numpy as np
from pandarallel import pandarallel
myFunction = lambda x: 1
windowSize = 10
inputDF = pd.DataFrame({
'a': np.random.rand(500)
})
%time apply=inputDF.rolling(window=windowSize, center=True).apply(myFunction)
print(apply.index[apply.isna().all(axis=1)])
for i in [2, 4, 6, 8]:
pandarallel.initialize(nb_workers=i, progress_bar=False)
%time parallel_apply=inputDF.rolling(window=windowSize, center=True).parallel_apply(myFunction)
print(parallel_apply.index[parallel_apply.isna().all(axis=1)])
Click for details...
Output:
CPU times: user 6.82 ms, sys: 172 µs, total: 7 ms
Wall time: 7 ms
Int64Index([0, 1, 2, 3, 4, 496, 497, 498, 499], dtype='int64')
INFO: Pandarallel will run on 2 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
CPU times: user 3.6 ms, sys: 9.14 ms, total: 12.7 ms
Wall time: 34.2 ms
Int64Index([0, 1, 2, 3, 4, 246, 247, 248, 249, 496, 497, 498, 499], dtype='int64')
INFO: Pandarallel will run on 4 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
CPU times: user 3.06 ms, sys: 9.1 ms, total: 12.2 ms
Wall time: 32.4 ms
Int64Index([ 0, 1, 2, 3, 4, 121, 122, 123, 124, 246, 247, 248, 249,
371, 372, 373, 374, 496, 497, 498, 499],
dtype='int64')
INFO: Pandarallel will run on 6 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
CPU times: user 4.4 ms, sys: 11.6 ms, total: 16 ms
Wall time: 42.3 ms
Int64Index([ 0, 1, 2, 3, 4, 80, 81, 82, 83, 164, 165, 166, 167,
247, 248, 249, 250, 330, 331, 332, 333, 413, 414, 415, 416, 496,
497, 498, 499],
dtype='int64')
INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
CPU times: user 4.35 ms, sys: 12.9 ms, total: 17.3 ms
Wall time: 43.7 ms
Int64Index([ 0, 1, 2, 3, 4, 59, 60, 61, 62, 122, 123, 124, 125,
185, 186, 187, 188, 248, 249, 250, 251, 310, 311, 312, 313, 372,
373, 374, 375, 434, 435, 436, 437, 496, 497, 498, 499],
dtype='int64')
I got the same issue, no idea as well.
- df.apply always works, but once swapped to parallel_apply or swifter it gets stuck, running forever.
- not related to settings, I have checked one by one neither progress_bar=False, nb_workers=1, use_memory_fs=False.
- not even related to func return, as like here, I set the func to empty and return 0
- The main issue is it just froze, it's so hard to debug without any errors.
# the minimal case
def get_values_test(x,ds,time_list):
#df_x = ds.sel(longitude=x['longitude'], latitude=x['latitude'],aspect=x['aspect'], method='nearest') # original function
return 0,0
def coupling_xarray_dataframe_wind_test(ds, df, time_list):
func = partial(get_values_test, ds=ds,time_list=time_list)
return df[['longitude','latitude','aspect']].parallel_apply(func,axis=1,result_type="expand") # stuck
If I use df.swifter.apply(func,axis=1,result_type="expand"), it works. However, it also does not run if I put the original function back.