disk.frame icon indicating copy to clipboard operation
disk.frame copied to clipboard

implement one-stage group-by for data.table

Open xiaodaigh opened this issue 6 years ago • 7 comments

xiaodaigh avatar Dec 18 '19 22:12 xiaodaigh

In my case, the data is already aggregated in each chunk. Thus, with first-step-group-by by data.table, all the data will be loaded in RAM? (This question is for understanding the logic)

ThoDuyNguyen avatar Dec 19 '19 00:12 ThoDuyNguyen

Thre seems to be some weird bug when overloading [[.disk.frame hence this can't be easily progressed. Submit a bug report to future.apply and hopefullys they are related.

Unpinning this issue for now as there is no clear way forward.

xiaodaigh avatar Dec 22 '19 03:12 xiaodaigh

Could you expand on this? My understanding is that you need a hardby = ... argument within the [.disk.frame call. Related, you could use NSE to capture the by argument, do a hardby and shard based on the grouping variables, and then do the future.apply::future_lapply call.

After looking into your approach for dplyr verbs, it would also be possible to use NSE to combine although it would not be particularly fun. But if you are game for PRs, I'd be happy to assist. Here's a very rough sketch although I believe we could develop a framework to be more generalized.

my_NSE = function(df, ...) {
    
    res = df[...]
    
    dots = match.call(expand.dots = FALSE)$...
    dot_names = names(dots)
    
    do_one_stage = TRUE
    if (any(dot_names == 'by')) 
        by_sub = dots$by
    else if (length(dots) >= 3L)
        by_sub = dots[[3L]]
    else 
        do_one_stage = FALSE
    
    if (do_one_stage) {
        sub_j = if (any(dot_names == 'j')) dots$j else dots[[2L]]
        if (is.name(sub_j) && sub_j == quote(.N)) 
            second_j = quote(.(N = sum(N)))
        else
            return(res) ## one_stage not found and we just return the chunked aggregation.
    
        eval(call('[', res, j = second_j, by = by_sub))
    }
    else
        res
}
iris.df = as.disk.frame(iris)
my_NSE(iris.df, , j = .N, by = Species)

##      Species     N
##       <fctr> <int>
##1:     setosa    50
##2: versicolor    50
##3:  virginica    50

Note, this takes about 40 ms on my computer.

ColeMiller1 avatar Feb 25 '21 05:02 ColeMiller1

do a hardby and shard based on the grouping variables, and then do the future.apply::future_lapply call.

The issue is that hard_by is slow. For many operations like mean and sum. You only need to perform a two-stage group-by where the first stage is performed per chunk and then a second stage to collect all the results from each chunk. That would be the ideal approach and would work for many functions.

Actually, instead of a PR are you able to create a new package like disk.frame.dt and store all data.table implementation there? I want to break disk.frame into smaller more independent pieces and I think supporting both dplyr and data.table syntax in one package is not going to be a good approach going forward. I would be happy to review the package if you do create one.

xiaodaigh avatar Feb 25 '21 11:02 xiaodaigh

I might be able to create a package. Would you still have a [ method within this package still?

ColeMiller1 avatar Feb 25 '21 12:02 ColeMiller1

I think it would not if an independent package exists. It will be migrated out.

xiaodaigh avatar Feb 26 '21 02:02 xiaodaigh

Ok. I will create a repo this weekend and start. For now the goal is NSE equivalent of what you’ve implemented for dplyr verbs.

ColeMiller1 avatar Feb 26 '21 22:02 ColeMiller1