pandarallel icon indicating copy to clipboard operation
pandarallel copied to clipboard

Writing results in a Dataframe works with apply, however the same doesn't work in parallel apply. Need help with the same.

Open ssakthivinayagam opened this issue 5 years ago • 2 comments

Please Find Below, the snippet with which we tried populating output with apply and parallel _apply functions. The parallel_apply function does not seem to work. Would help if you could decipher this.

import pandas as pd
import numpy as np

global outputDF
global index
def myFunction(array):

    global index
    global outputDF

    # Some random operation
    outputDF['min'][index] = np.nanmin(array)
    outputDF['max'][index] = np.nanmax(array)
    outputDF['mean'][index] = np.nanmean(array)

    index += 1
    # Returning a useless variable
    return 0
if __name__ == "__main__":

    global outputDF
    global index

    # A random window size
    windowSize = 10

    # Preparing some random input data
    inputDF = pd.DataFrame({ 'randomValue': [np.nan] * 500 })
    for i in range(len(inputDF)):
        inputDF['randomValue'].values[i] = np.random.rand()


    # Pre-Allocate memory
    outputDF = pd.DataFrame({ 'min': [np.nan] * len(inputDF),
                              'max': [np.nan] * len(inputDF),
                              'mean': [np.nan] * len(inputDF)
                              })   

    # Precise the staring index (due to the window size)
    d = (windowSize - 1) / 2
    index = np.int(np.floor( d ) )

    # Do the rolling apply here
    apply=inputDF['randomValue'].rolling(window=windowSize,center=True).apply(myFunction,args=())
    parallel_apply=inputDF['randomValue'].rolling(window=windowSize,center=True).parallel_apply(myFunction,args=())
    
    #assert index + np.int(np.ceil(d)) == len(inputDF), 'Length mismatch'

    outputDF.set_index = inputDF.index

    # Optional : Clean the nulls
    outputDF.dropna(inplace=True)

    print(outputDF)

ssakthivinayagam avatar Feb 18 '21 11:02 ssakthivinayagam

I took some time to investigate and I think there is a problem with using pandarallel for RollingGroupBy's with center=True:

As far as I can tell, using .rolling(center=True).parallel_apply(func) on a pd.Series or a pd.DataFrame with a single column distributes the rolling windows across workers and (I guess) windows at the start of a new chunk are "dropped". Notably this problem does not occur when using center=False. Unfortunately this part of the codebase is somewhat alien to me, so I don't know how to fix it.

More minimalistic example:

import pandas as pd
import numpy as np
from pandarallel import pandarallel

myFunction = lambda x: 1

windowSize = 10

inputDF = pd.DataFrame({
    'a': np.random.rand(500)
    })

%time apply=inputDF.rolling(window=windowSize, center=True).apply(myFunction)

print(apply.index[apply.isna().all(axis=1)])

for i in [2, 4, 6, 8]:
    pandarallel.initialize(nb_workers=i, progress_bar=False)
    %time parallel_apply=inputDF.rolling(window=windowSize, center=True).parallel_apply(myFunction)
    print(parallel_apply.index[parallel_apply.isna().all(axis=1)])
Click for details...

Output:

CPU times: user 6.82 ms, sys: 172 µs, total: 7 ms
Wall time: 7 ms
Int64Index([0, 1, 2, 3, 4, 496, 497, 498, 499], dtype='int64')
INFO: Pandarallel will run on 2 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
CPU times: user 3.6 ms, sys: 9.14 ms, total: 12.7 ms
Wall time: 34.2 ms
Int64Index([0, 1, 2, 3, 4, 246, 247, 248, 249, 496, 497, 498, 499], dtype='int64')
INFO: Pandarallel will run on 4 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
CPU times: user 3.06 ms, sys: 9.1 ms, total: 12.2 ms
Wall time: 32.4 ms
Int64Index([  0,   1,   2,   3,   4, 121, 122, 123, 124, 246, 247, 248, 249,
            371, 372, 373, 374, 496, 497, 498, 499],
           dtype='int64')
INFO: Pandarallel will run on 6 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
CPU times: user 4.4 ms, sys: 11.6 ms, total: 16 ms
Wall time: 42.3 ms
Int64Index([  0,   1,   2,   3,   4,  80,  81,  82,  83, 164, 165, 166, 167,
            247, 248, 249, 250, 330, 331, 332, 333, 413, 414, 415, 416, 496,
            497, 498, 499],
           dtype='int64')
INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
CPU times: user 4.35 ms, sys: 12.9 ms, total: 17.3 ms
Wall time: 43.7 ms
Int64Index([  0,   1,   2,   3,   4,  59,  60,  61,  62, 122, 123, 124, 125,
            185, 186, 187, 188, 248, 249, 250, 251, 310, 311, 312, 313, 372,
            373, 374, 375, 434, 435, 436, 437, 496, 497, 498, 499],
           dtype='int64')

till-m avatar Sep 06 '22 15:09 till-m

I got the same issue, no idea as well.

  • df.apply always works, but once swapped to parallel_apply or swifter it gets stuck, running forever.
  • not related to settings, I have checked one by one neither progress_bar=False, nb_workers=1, use_memory_fs=False.
  • not even related to func return, as like here, I set the func to empty and return 0
  • The main issue is it just froze, it's so hard to debug without any errors.
# the minimal case
def get_values_test(x,ds,time_list):
    #df_x = ds.sel(longitude=x['longitude'], latitude=x['latitude'],aspect=x['aspect'], method='nearest') # original function
    return 0,0
def coupling_xarray_dataframe_wind_test(ds, df, time_list):
    func = partial(get_values_test, ds=ds,time_list=time_list)
    return df[['longitude','latitude','aspect']].parallel_apply(func,axis=1,result_type="expand")  # stuck

If I use df.swifter.apply(func,axis=1,result_type="expand"), it works. However, it also does not run if I put the original function back.

liuh886 avatar Mar 16 '23 10:03 liuh886